Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebHost: use a limited process pool to run Rooms #3214

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions MultiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ class Context:
all_item_and_group_names: typing.Dict[str, typing.Set[str]]
all_location_and_group_names: typing.Dict[str, typing.Set[str]]
non_hintable_names: typing.Dict[str, typing.Set[str]]
logger: logging.Logger

def __init__(self, host: str, port: int, server_password: str, password: str, location_check_points: int,
hint_cost: int, item_cheat: bool, release_mode: str = "disabled", collect_mode="disabled",
remaining_mode: str = "disabled", auto_shutdown: typing.SupportsFloat = 0, compatibility: int = 2,
log_network: bool = False):
log_network: bool = False, logger: logging.Logger = logging.getLogger()):
self.logger = logger
super(Context, self).__init__()
self.slot_info = {}
self.log_network = log_network
Expand Down Expand Up @@ -287,12 +289,12 @@ async def send_msgs(self, endpoint: Endpoint, msgs: typing.Iterable[dict]) -> bo
try:
await endpoint.socket.send(msg)
except websockets.ConnectionClosed:
logging.exception(f"Exception during send_msgs, could not send {msg}")
self.logger.exception(f"Exception during send_msgs, could not send {msg}")
await self.disconnect(endpoint)
return False
else:
if self.log_network:
logging.info(f"Outgoing message: {msg}")
self.logger.info(f"Outgoing message: {msg}")
return True

async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool:
Expand All @@ -301,12 +303,12 @@ async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool:
try:
await endpoint.socket.send(msg)
except websockets.ConnectionClosed:
logging.exception("Exception during send_encoded_msgs")
self.logger.exception("Exception during send_encoded_msgs")
await self.disconnect(endpoint)
return False
else:
if self.log_network:
logging.info(f"Outgoing message: {msg}")
self.logger.info(f"Outgoing message: {msg}")
return True

async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint], msg: str) -> bool:
Expand All @@ -317,11 +319,11 @@ async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint]
try:
websockets.broadcast(sockets, msg)
except RuntimeError:
logging.exception("Exception during broadcast_send_encoded_msgs")
self.logger.exception("Exception during broadcast_send_encoded_msgs")
return False
else:
if self.log_network:
logging.info(f"Outgoing broadcast: {msg}")
self.logger.info(f"Outgoing broadcast: {msg}")
return True

def broadcast_all(self, msgs: typing.List[dict]):
Expand All @@ -330,7 +332,7 @@ def broadcast_all(self, msgs: typing.List[dict]):
async_start(self.broadcast_send_encoded_msgs(endpoints, msgs))

def broadcast_text_all(self, text: str, additional_arguments: dict = {}):
logging.info("Notice (all): %s" % text)
self.logger.info("Notice (all): %s" % text)
self.broadcast_all([{**{"cmd": "PrintJSON", "data": [{ "text": text }]}, **additional_arguments}])

def broadcast_team(self, team: int, msgs: typing.List[dict]):
Expand All @@ -352,7 +354,7 @@ async def disconnect(self, endpoint: Client):
def notify_client(self, client: Client, text: str, additional_arguments: dict = {}):
if not client.auth:
return
logging.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text))
self.logger.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text))
async_start(self.send_msgs(client, [{"cmd": "PrintJSON", "data": [{ "text": text }], **additional_arguments}]))

def notify_client_multiple(self, client: Client, texts: typing.List[str], additional_arguments: dict = {}):
Expand Down Expand Up @@ -451,7 +453,7 @@ def _load(self, decoded_obj: dict, game_data_packages: typing.Dict[str, typing.A
for game_name, data in decoded_obj.get("datapackage", {}).items():
if game_name in game_data_packages:
data = game_data_packages[game_name]
logging.info(f"Loading embedded data package for game {game_name}")
self.logger.info(f"Loading embedded data package for game {game_name}")
self.gamespackage[game_name] = data
self.item_name_groups[game_name] = data["item_name_groups"]
if "location_name_groups" in data:
Expand Down Expand Up @@ -483,7 +485,7 @@ def _save(self, exit_save: bool = False) -> bool:
with open(self.save_filename, "wb") as f:
f.write(zlib.compress(encoded_save))
except Exception as e:
logging.exception(e)
self.logger.exception(e)
return False
else:
return True
Expand All @@ -501,9 +503,9 @@ def init_save(self, enabled: bool = True):
save_data = restricted_loads(zlib.decompress(f.read()))
self.set_save(save_data)
except FileNotFoundError:
logging.error('No save data found, starting a new game')
self.logger.error('No save data found, starting a new game')
except Exception as e:
logging.exception(e)
self.logger.exception(e)
self._start_async_saving()

def _start_async_saving(self):
Expand All @@ -520,11 +522,11 @@ def get_datetime_second():
next_wakeup = (second - get_datetime_second()) % self.auto_save_interval
time.sleep(max(1.0, next_wakeup))
if self.save_dirty:
logging.debug("Saving via thread.")
self.logger.debug("Saving via thread.")
self._save()
except OperationalError as e:
logging.exception(e)
logging.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.")
self.logger.exception(e)
self.logger.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.")
else:
self.save_dirty = False
self.auto_saver_thread = threading.Thread(target=save_regularly, daemon=True)
Expand Down Expand Up @@ -598,7 +600,7 @@ def set_save(self, savedata: dict):
if "stored_data" in savedata:
self.stored_data = savedata["stored_data"]
# count items and slots from lists for items_handling = remote
logging.info(
self.logger.info(
f'Loaded save file with {sum([len(v) for k, v in self.received_items.items() if k[2]])} received items '
f'for {sum(k[2] for k in self.received_items)} players')

Expand Down Expand Up @@ -640,13 +642,13 @@ def _set_options(self, server_options: dict):
try:
raise Exception(f"Could not set server option {key}, skipping.") from e
except Exception as e:
logging.exception(e)
logging.debug(f"Setting server option {key} to {value} from supplied multidata")
self.logger.exception(e)
self.logger.debug(f"Setting server option {key} to {value} from supplied multidata")
setattr(self, key, value)
elif key == "disable_item_cheat":
self.item_cheat = not bool(value)
else:
logging.debug(f"Unrecognized server option {key}")
self.logger.debug(f"Unrecognized server option {key}")

def get_aliased_name(self, team: int, slot: int):
if (team, slot) in self.name_aliases:
Expand Down Expand Up @@ -680,7 +682,7 @@ def notify_hints(self, team: int, hints: typing.List[NetUtils.Hint], only_new: b
self.hints[team, player].add(hint)
new_hint_events.add(player)

logging.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint)))
self.logger.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint)))
for slot in new_hint_events:
self.on_new_hint(team, slot)
for slot, hint_data in concerns.items():
Expand Down Expand Up @@ -739,21 +741,21 @@ async def server(websocket, path: str = "/", ctx: Context = None):

try:
if ctx.log_network:
logging.info("Incoming connection")
ctx.logger.info("Incoming connection")
await on_client_connected(ctx, client)
if ctx.log_network:
logging.info("Sent Room Info")
ctx.logger.info("Sent Room Info")
async for data in websocket:
if ctx.log_network:
logging.info(f"Incoming message: {data}")
ctx.logger.info(f"Incoming message: {data}")
for msg in decode(data):
await process_client_cmd(ctx, client, msg)
except Exception as e:
if not isinstance(e, websockets.WebSocketException):
logging.exception(e)
ctx.logger.exception(e)
finally:
if ctx.log_network:
logging.info("Disconnected")
ctx.logger.info("Disconnected")
await ctx.disconnect(client)


Expand Down Expand Up @@ -985,7 +987,7 @@ def register_location_checks(ctx: Context, team: int, slot: int, locations: typi
new_item = NetworkItem(item_id, location, slot, flags)
send_items_to(ctx, team, target_player, new_item)

logging.info('(Team #%d) %s sent %s to %s (%s)' % (
ctx.logger.info('(Team #%d) %s sent %s to %s (%s)' % (
team + 1, ctx.player_names[(team, slot)], ctx.item_names[item_id],
ctx.player_names[(team, target_player)], ctx.location_names[location]))
info_text = json_format_send_event(new_item, target_player)
Expand Down Expand Up @@ -1625,7 +1627,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict):
try:
cmd: str = args["cmd"]
except:
logging.exception(f"Could not get command from {args}")
ctx.logger.exception(f"Could not get command from {args}")
await ctx.send_msgs(client, [{'cmd': 'InvalidPacket', "type": "cmd", "original_cmd": None,
"text": f"Could not get command from {args} at `cmd`"}])
raise
Expand Down Expand Up @@ -1668,7 +1670,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict):
if ctx.compatibility == 0 and args['version'] != version_tuple:
errors.add('IncompatibleVersion')
if errors:
logging.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.")
ctx.logger.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.")
await ctx.send_msgs(client, [{"cmd": "ConnectionRefused", "errors": list(errors)}])
else:
team, slot = ctx.connect_names[args['name']]
Expand Down Expand Up @@ -2286,7 +2288,7 @@ def inactivity_shutdown():
if to_cancel:
for task in to_cancel:
task.cancel()
logging.info("Shutting down due to inactivity.")
ctx.logger.info("Shutting down due to inactivity.")

while not ctx.exit_event.is_set():
if not ctx.client_activity_timers.values():
Expand Down
1 change: 1 addition & 0 deletions WebHostLib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

app.config["SELFHOST"] = True # application process is in charge of running the websites
app.config["GENERATORS"] = 8 # maximum concurrent world gens
app.config["HOSTERS"] = 8 # maximum concurrent room hosters
app.config["SELFLAUNCH"] = True # application process is in charge of launching Rooms.
app.config["SELFLAUNCHCERT"] = None # can point to a SSL Certificate to encrypt Room websocket connections
app.config["SELFLAUNCHKEY"] = None # can point to a SSL Certificate Key to encrypt Room websocket connections
Expand Down
107 changes: 41 additions & 66 deletions WebHostLib/autolauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import multiprocessing
import threading
import time
import typing
from uuid import UUID
Expand All @@ -15,16 +14,6 @@
from .locker import Locker, AlreadyRunningException


def launch_room(room: Room, config: dict):
# requires db_session!
if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout):
multiworld = multiworlds.get(room.id, None)
if not multiworld:
multiworld = MultiworldInstance(room, config)

multiworld.start()


def handle_generation_success(seed_id):
logging.info(f"Generation finished for seed {seed_id}")

Expand Down Expand Up @@ -59,29 +48,40 @@ def init_db(pony_config: dict):
db.generate_mapping()


def cleanup():
"""delete unowned user-content"""
with db_session:
# >>> bool(uuid.UUID(int=0))
# True
rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True)
seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True)
slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True)
# Command gets deleted by ponyorm Cascade Delete, as Room is Required
if rooms or seeds or slots:
logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.")


def autohost(config: dict):
def keep_running():
try:
with Locker("autohost"):
# delete unowned user-content
with db_session:
# >>> bool(uuid.UUID(int=0))
# True
rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True)
seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True)
slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True)
# Command gets deleted by ponyorm Cascade Delete, as Room is Required
if rooms or seeds or slots:
logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.")
run_guardian()
cleanup()
hosters = []
for x in range(config["HOSTERS"]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could HOSTERS be added to docs/webhost configuration sample.yaml ?

hoster = MultiworldInstance(config, x)
hosters.append(hoster)
hoster.start()

while 1:
time.sleep(0.1)
with db_session:
rooms = select(
room for room in Room if
room.last_activity >= datetime.utcnow() - timedelta(days=3))
for room in rooms:
launch_room(room, config)
# we have to filter twice, as the per-room timeout can't currently be PonyORM transpiled.
if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout):
hosters[room.id.int % len(hosters)].start_room(room.id)

except AlreadyRunningException:
logging.info("Autohost reports as already running, not starting another.")
Expand Down Expand Up @@ -132,29 +132,38 @@ def keep_running():


class MultiworldInstance():
def __init__(self, room: Room, config: dict):
self.room_id = room.id
def __init__(self, config: dict, id: int):
self.room_ids = set()
self.process: typing.Optional[multiprocessing.Process] = None
with guardian_lock:
multiworlds[self.room_id] = self
self.ponyconfig = config["PONY"]
self.cert = config["SELFLAUNCHCERT"]
self.key = config["SELFLAUNCHKEY"]
self.host = config["HOST_ADDRESS"]
self.rooms_to_start = multiprocessing.Queue()
self.rooms_shutting_down = multiprocessing.Queue()
self.name = f"MultiHoster{id}"

def start(self):
if self.process and self.process.is_alive():
return False

logging.info(f"Spinning up {self.room_id}")
process = multiprocessing.Process(group=None, target=run_server_process,
args=(self.room_id, self.ponyconfig, get_static_server_data(),
self.cert, self.key, self.host),
name="MultiHost")
args=(self.name, self.ponyconfig, get_static_server_data(),
self.cert, self.key, self.host,
self.rooms_to_start, self.rooms_shutting_down),
name=self.name)
process.start()
# bind after start to prevent thread sync issues with guardian.
self.process = process

def start_room(self, room_id):
while not self.rooms_shutting_down.empty():
self.room_ids.remove(self.rooms_shutting_down.get(block=True, timeout=None))
if room_id in self.room_ids:
pass # should already be hosted currently.
else:
self.room_ids.add(room_id)
self.rooms_to_start.put(room_id)

def stop(self):
if self.process:
self.process.terminate()
Expand All @@ -168,40 +177,6 @@ def collect(self):
self.process = None


guardian = None
guardian_lock = threading.Lock()


def run_guardian():
global guardian
global multiworlds
with guardian_lock:
if not guardian:
try:
import resource
except ModuleNotFoundError:
pass # unix only module
else:
# Each Server is another file handle, so request as many as we can from the system
file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
# set soft limit to hard limit
resource.setrlimit(resource.RLIMIT_NOFILE, (file_limit, file_limit))

def guard():
while 1:
time.sleep(1)
done = []
with guardian_lock:
for key, instance in multiworlds.items():
if instance.done():
instance.collect()
done.append(key)
for key in done:
del (multiworlds[key])

guardian = threading.Thread(name="Guardian", target=guard)


from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed, Slot
from .customserver import run_server_process, get_static_server_data
from .generate import gen_game
Loading
Loading