Skip to content

Commit

Permalink
Get FileSerializer back
Browse files Browse the repository at this point in the history
  • Loading branch information
zbx1425 committed Dec 26, 2024
1 parent de5846d commit 6f4e637
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import cn.zbx1425.scatteredshards.ServerConfig;
import cn.zbx1425.scatteredshards.sync.RedisSynchronizer;
import cn.zbx1425.scatteredshards.sync.SyncDispatcher;
import cn.zbx1425.scatteredshards.sync.SyncPersistDispatcher;
import cn.zbx1425.scatteredshards.sync.Synchronizer;
import io.github.cottonmc.cotton.gui.impl.LibGuiCommon;
import net.modfest.scatteredshards.ScatteredShards;
import net.modfest.scatteredshards.api.ScatteredShardsAPI;
import net.modfest.scatteredshards.load.ShardTypeLoader;
import net.neoforged.bus.api.IEventBus;
import net.neoforged.bus.api.SubscribeEvent;
Expand All @@ -16,8 +18,6 @@
import net.neoforged.neoforge.event.server.ServerStartingEvent;
import net.neoforged.neoforge.event.server.ServerStoppingEvent;

import java.io.IOException;

@Mod(ScatteredShards.ID)
public class ScatteredShardsNeoForge {

Expand Down Expand Up @@ -46,22 +46,29 @@ public static void onServerStarting(ServerStartingEvent event) {
try {
SERVER_CONFIG.load(event.getServer().getRunDirectory()
.resolve("config").resolve("scattered_shards.json"));
SyncDispatcher.INSTANCE = new SyncDispatcher(event.getServer(),
SERVER_CONFIG.syncRole.value.equalsIgnoreCase("host"));
Synchronizer peerChannel;
if (!SERVER_CONFIG.redisUrl.value.isEmpty()) {
SyncDispatcher.INSTANCE.peerChannel = new RedisSynchronizer(SERVER_CONFIG.redisUrl.value);
peerChannel = new RedisSynchronizer(SERVER_CONFIG.redisUrl.value);
} else {
peerChannel = null;
}
} catch (IOException e) {
ScatteredShards.LOGGER.error("Failed to load server config", e);
SyncPersistDispatcher.CURRENT = new SyncPersistDispatcher(
event.getServer(),
SERVER_CONFIG.syncRole.value.equalsIgnoreCase("host"),
peerChannel
);
SyncPersistDispatcher.CURRENT.loadFromToShareOrDiskAndInto(ScatteredShardsAPI.exportServerCollections());
} catch (Exception e) {
ScatteredShards.LOGGER.error("Failed to use server config", e);
}
}

@SubscribeEvent
public static void onServerStopping(ServerStoppingEvent event) {
try {
if (SyncDispatcher.INSTANCE != null) {
SyncDispatcher.INSTANCE.close();
SyncDispatcher.INSTANCE = null;
if (SyncPersistDispatcher.CURRENT != null) {
SyncPersistDispatcher.CURRENT.close();
SyncPersistDispatcher.CURRENT = null;
}
} catch (Exception e) {
ScatteredShards.LOGGER.error("Failed to close sync dispatcher", e);
Expand Down
62 changes: 62 additions & 0 deletions src/main/java/cn/zbx1425/scatteredshards/sync/FileSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cn.zbx1425.scatteredshards.sync;

import net.minecraft.util.Identifier;
import net.modfest.scatteredshards.ScatteredShards;
import net.modfest.scatteredshards.api.ShardCollection;
import net.modfest.scatteredshards.api.impl.ShardCollectionImpl;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

public class FileSerializer {

private final Path basePath;

public FileSerializer(Path basePath) {
this.basePath = basePath;
}

public void loadInto(Map<UUID, ShardCollection> playerShardCollections) throws IOException {
playerShardCollections.clear();
try {
Files.createDirectories(basePath.resolve("collections"));
} catch (Exception ignored) { }
try (Stream<Path> userFiles = Files.list(basePath.resolve("collections"))) {
for (Path userFile : userFiles.toList()) {
try {
String[] fileNameParts = userFile.getFileName().toString().split("\\.");
if (fileNameParts.length != 2 || !fileNameParts[1].equals("txt")) continue;
ShardCollection collection = new ShardCollectionImpl();
for (String line : Files.readAllLines(userFile)) {
if (line.isEmpty()) continue;
Identifier shardId = Identifier.tryParse(line);
if (shardId != null) collection.add(shardId);
}
playerShardCollections.put(UUID.fromString(fileNameParts[0]), collection);
} catch (IOException ex) {
ScatteredShards.LOGGER.error("Failed to load shard collections from disk for file " + userFile.getFileName(), ex);
}
}
}
}

private Path getUserPath(UUID bearer) {
return basePath.resolve("collections")
.resolve(bearer.toString() + ".txt");
}

public void write(UUID bearer, ShardCollection existingEntry) throws IOException {
Path targetFile = getUserPath(bearer);
try (FileOutputStream fos = new FileOutputStream(targetFile.toFile())) {
for (Identifier shardId : existingEntry) {
fos.write((shardId.toString() + "\n").getBytes());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void writeAllToShare(Map<UUID, ShardCollection> collections) {
}

@Override
public void readAllFromShare(Map<UUID, ShardCollection> collections) {
public void readAllFromShareInto(Map<UUID, ShardCollection> collections) {
Map<String, String> data = redisConn.sync().hgetall(HMAP_ALL_KEY);
for (Map.Entry<String, String> entry : data.entrySet()) {
ShardCollection collection = new ShardCollectionImpl();
Expand All @@ -60,7 +60,6 @@ public void readAllFromShare(Map<UUID, ShardCollection> collections) {

@Override
public void notifyCollectionChange(UUID bearer, ShardCollection newEntry) {
if (!SyncDispatcher.INSTANCE.isHost) return;
StringBuilder data = new StringBuilder();
for (Identifier shard : newEntry) data.append(shard.toString()).append('\n');
redisConn.async().hset(HMAP_ALL_KEY, bearer.toString(), data.toString());
Expand Down
24 changes: 0 additions & 24 deletions src/main/java/cn/zbx1425/scatteredshards/sync/SyncDispatcher.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cn.zbx1425.scatteredshards.sync;

import net.minecraft.server.MinecraftServer;
import net.minecraft.util.Identifier;
import net.minecraft.util.WorldSavePath;
import net.modfest.scatteredshards.ScatteredShards;
import net.modfest.scatteredshards.api.ShardCollection;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;

public class SyncPersistDispatcher implements Synchronizer {

public final MinecraftServer server;
public final Path basePath;

public boolean isHost;
public final Synchronizer peerChannel;
public final FileSerializer persistAccess;

public static SyncPersistDispatcher CURRENT;

public SyncPersistDispatcher(MinecraftServer server, boolean isHost, Synchronizer peerChannel) {
this.server = server;
this.basePath = Path.of(server.getSavePath(WorldSavePath.ROOT).toString(), "scattered_shards");
this.isHost = isHost;
this.peerChannel = peerChannel;
this.persistAccess = new FileSerializer(basePath);
}

public void loadFromToShareOrDiskAndInto(Map<UUID, ShardCollection> playerShardCollections) {
if (isHost) {
try {
persistAccess.loadInto(playerShardCollections);
} catch (IOException e) {
ScatteredShards.LOGGER.error("Failed to load shard collections from disk", e);
}
peerChannel.writeAllToShare(playerShardCollections);
} else {
peerChannel.readAllFromShareInto(playerShardCollections);
}
}

@Override
public void notifyCollectionChange(UUID bearer, ShardCollection newEntry) {
if (!isHost) return;
peerChannel.notifyCollectionChange(bearer, newEntry);
try {
persistAccess.write(bearer, newEntry);
} catch (IOException e) {
ScatteredShards.LOGGER.error("Failed to persist shard collection for player " + bearer, e);
}
}

@Override
public void notifyCollect(UUID bearer, Identifier shardId) {
peerChannel.notifyCollect(bearer, shardId);
}

@Override
public void notifyUncollect(UUID bearer, Identifier shardId) {
peerChannel.notifyUncollect(bearer, shardId);
}

@Override
public void writeAllToShare(Map<UUID, ShardCollection> collections) {
throw new UnsupportedOperationException();
}

@Override
public void readAllFromShareInto(Map<UUID, ShardCollection> collections) {
throw new UnsupportedOperationException();
}

@Override
public void close() throws Exception {
peerChannel.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface Synchronizer extends AutoCloseable {

void writeAllToShare(Map<UUID, ShardCollection> collections);

void readAllFromShare(Map<UUID, ShardCollection> collections);
void readAllFromShareInto(Map<UUID, ShardCollection> collections);

void notifyCollectionChange(UUID bearer, ShardCollection newEntry);

Expand All @@ -24,7 +24,7 @@ public void writeAllToShare(Map<UUID, ShardCollection> collections) {
}

@Override
public void readAllFromShare(Map<UUID, ShardCollection> collections) {
public void readAllFromShareInto(Map<UUID, ShardCollection> collections) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.modfest.scatteredshards.api;

import cn.zbx1425.scatteredshards.sync.SyncDispatcher;
import cn.zbx1425.scatteredshards.sync.SyncPersistDispatcher;
import com.mojang.blaze3d.systems.RenderSystem;
import net.fabricmc.api.EnvType;
import net.fabricmc.api.Environment;
Expand All @@ -11,7 +11,6 @@
import net.minecraft.util.Identifier;
import net.modfest.scatteredshards.ScatteredShards;
import net.modfest.scatteredshards.api.impl.ShardCollectionImpl;
import net.modfest.scatteredshards.api.impl.ShardCollectionPersistentState;
import net.modfest.scatteredshards.api.impl.ShardLibraryImpl;
import net.modfest.scatteredshards.networking.S2CUpdateShard;
import org.jetbrains.annotations.ApiStatus;
Expand All @@ -24,7 +23,6 @@ public class ScatteredShardsAPI {

public static final String MODIFY_SHARD_PERMISSION = ScatteredShards.permission("modify_shard");

private static ShardCollectionPersistentState collectionPersistentState;
private static final ShardLibrary serverShardLibrary = new ShardLibraryImpl();
private static Map<UUID, ShardCollection> serverCollections = new HashMap<>();
private static ShardLibrary clientShardLibrary = null;
Expand Down Expand Up @@ -60,7 +58,6 @@ public static ShardCollection getServerCollection(UUID uuid) {
if (collection == null) {
collection = new ShardCollectionImpl();
serverCollections.put(uuid, collection);
if (collectionPersistentState != null) collectionPersistentState.markDirty();
}
return collection;
}
Expand Down Expand Up @@ -121,14 +118,12 @@ public static void updateClientGlobalCollection(GlobalCollection collection) {
public static boolean triggerShardCollection(ServerPlayerEntity player, Identifier shardId) {
ShardCollection collection = getServerCollection(player);
if (collection.add(shardId)) {
if (player.getServer() != null) collectionPersistentState.markDirty();

serverGlobalCollection.update(shardId, 1, serverCollections.size());
ServerPlayNetworking.send(player, new S2CUpdateShard(shardId, S2CUpdateShard.Mode.COLLECT));

// Let our peers know this
SyncDispatcher.INSTANCE.peerChannel.notifyCollect(player.getUuid(), shardId);
SyncDispatcher.INSTANCE.peerChannel.notifyCollectionChange(player.getUuid(), collection);
SyncPersistDispatcher.CURRENT.notifyCollect(player.getUuid(), shardId);
SyncPersistDispatcher.CURRENT.notifyCollectionChange(player.getUuid(), collection);

return true;
} else {
Expand All @@ -140,48 +135,42 @@ public static boolean triggerShardCollection(ServerPlayerEntity player, Identifi
public static boolean triggerShardCollection(UUID uuid, Identifier shardId) {
ShardCollection collection = getServerCollection(uuid);
if (collection.add(shardId)) {
// TODO investigate what if (player.getServer() != null) actually does?
collectionPersistentState.markDirty();

serverGlobalCollection.update(shardId, 1, serverCollections.size());

// Amend the kv store, in case this instance is host
SyncDispatcher.INSTANCE.peerChannel.notifyCollectionChange(uuid, collection);
SyncPersistDispatcher.CURRENT.notifyCollectionChange(uuid, collection);

return true;
} else {
return false;
}
}

// This overload is about a player in this server
public static boolean triggerShardUncollection(ServerPlayerEntity player, Identifier shardId) {
ShardCollection collection = getServerCollection(player);
if (collection.remove(shardId)) {
if (player.getServer() != null) collectionPersistentState.markDirty();

serverGlobalCollection.update(shardId, -1, serverCollections.size());
ServerPlayNetworking.send(player, new S2CUpdateShard(shardId, S2CUpdateShard.Mode.UNCOLLECT));

// Let our peers know this
SyncDispatcher.INSTANCE.peerChannel.notifyUncollect(player.getUuid(), shardId);
SyncDispatcher.INSTANCE.peerChannel.notifyCollectionChange(player.getUuid(), collection);
SyncPersistDispatcher.CURRENT.notifyUncollect(player.getUuid(), shardId);
SyncPersistDispatcher.CURRENT.notifyCollectionChange(player.getUuid(), collection);

return true;
} else {
return false;
}
}

// This overload is about a player in another server, i.e. from peer channel
public static boolean triggerShardUncollection(UUID uuid, Identifier shardId) {
ShardCollection collection = getServerCollection(uuid);
if (collection.remove(shardId)) {
// TODO investigate what if (player.getServer() != null) actually does?
collectionPersistentState.markDirty();

serverGlobalCollection.update(shardId, -1, serverCollections.size());

// Amend the kv store, in case this instance is host
SyncDispatcher.INSTANCE.peerChannel.notifyCollectionChange(uuid, collection);
SyncPersistDispatcher.CURRENT.notifyCollectionChange(uuid, collection);

return true;
} else {
Expand Down Expand Up @@ -211,8 +200,4 @@ public static void initClient() {
clientShardLibrary = new ShardLibraryImpl();
clientShardCollection = new ShardCollectionImpl();
}

public static void register(ShardCollectionPersistentState persistentState) {
ScatteredShardsAPI.collectionPersistentState = persistentState;
}
}
Loading

0 comments on commit 6f4e637

Please sign in to comment.