diff --git a/app/__init__.py b/app/__init__.py index 2f9678c0..3d329d48 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,4 +4,4 @@ from . import server from . import events from . import common -from . import jobs +from . import tasks diff --git a/app/clients/handler.py b/app/clients/handler.py index fd6db009..650da09d 100644 --- a/app/clients/handler.py +++ b/app/clients/handler.py @@ -3,6 +3,7 @@ from . import DefaultRequestPacket as RequestPacket from ..common.database.objects import DBBeatmap, DBScore +from ..common.database.repositories import wrapper from ..objects.multiplayer import Match from ..objects.channel import Channel from ..objects.player import Player @@ -55,25 +56,23 @@ def wrapper(func) -> Callable: return wrapper +@wrapper.exception_wrapper() def resolve_channel(channel_name: str, player: Player) -> Optional[Channel]: - try: - if channel_name == '#spectator': - # Select spectator chat - return ( - player.spectating.spectator_chat - if player.spectating else - player.spectator_chat - ) + if channel_name == '#spectator': + # Select spectator chat + return ( + player.spectating.spectator_chat + if player.spectating else + player.spectator_chat + ) - elif channel_name == '#multiplayer': - # Select multiplayer chat - return player.match.chat + elif channel_name == '#multiplayer': + # Select multiplayer chat + return player.match.chat - # Resolve channel by name - if channel := session.channels.by_name(channel_name): - return channel - except AttributeError: - return + # Resolve channel by name + if channel := session.channels.by_name(channel_name): + return channel @register(RequestPacket.PONG) def pong(player: Player): @@ -89,10 +88,16 @@ def receive_updates(player: Player, filter: PresenceFilter): player.filter = filter if filter.value <= 0: + # Client set filter to "None" + # No players will be sent return - players = session.players if filter == PresenceFilter.All else \ - player.online_friends + # Account for player filter + players = ( + session.players + if filter == PresenceFilter.All + else player.online_friends + ) player.enqueue_players(players, stats_only=True) @@ -206,6 +211,11 @@ def send_message(player: Player, message: bMessage): @register(RequestPacket.SEND_PRIVATE_MESSAGE) def send_private_message(sender: Player, message: bMessage): + if message.target == 'peppy': + # This could be an internal osu! anti-cheat message + officer.call(f'{sender.name} tried to message peppy: "{message.content}"') + return + if not (target := session.players.by_name(message.target)): sender.revoke_channel(message.target) return @@ -290,7 +300,7 @@ def send_private_message(sender: Player, message: bMessage): messages.create( sender.name, target.name, - message.content + message.content[:512] ) sender.update_activity() @@ -311,17 +321,18 @@ def away_message(player: Player, message: bMessage): is_private=True ) ) - else: - player.away_message = None - player.enqueue_message( - bMessage( - session.bot_player.name, - 'You are no longer marked as being away', - session.bot_player.name, - session.bot_player.id, - is_private=True - ) + return + + player.away_message = None + player.enqueue_message( + bMessage( + session.bot_player.name, + 'You are no longer marked as being away', + session.bot_player.name, + session.bot_player.id, + is_private=True ) + ) @register(RequestPacket.ADD_FRIEND) def add_friend(player: Player, target_id: int): @@ -377,9 +388,7 @@ def beatmap_info(player: Player, info: bBeatmapInfoRequest, ignore_limit: bool = if total_maps <= 0: return - player.logger.info( - f'Got {total_maps} beatmap requests' - ) + player.logger.info(f'Got {total_maps} beatmap requests') # Fetch all matching beatmaps from database with session.database.managed_session() as s: @@ -420,14 +429,19 @@ def beatmap_info(player: Player, info: bBeatmapInfoRequest, ignore_limit: bool = map_infos: List[bBeatmapInfo] = [] for index, beatmap in maps: + if beatmap.status <= -3: + # Not submitted + continue + ranked = { - -2: 0, # Graveyard: Pending - -1: 0, # WIP: Pending - 0: 0, # Pending: Pending - 1: 1, # Ranked: Ranked - 2: 2, # Approved: Approved - 3: 2, # Qualified: Approved - 4: 2, # Loved: Approved + -3: -1, # Not submitted + -2: 0, # Graveyard: Pending + -1: 0, # WIP: Pending + 0: 0, # Pending: Pending + 1: 1, # Ranked: Ranked + 2: 2, # Approved: Approved + 3: 2, # Qualified: Approved + 4: 2, # Loved: Approved }[beatmap.status] # Get personal best in every mode for this beatmap @@ -454,12 +468,12 @@ def beatmap_info(player: Player, info: bBeatmapInfoRequest, ignore_limit: bool = index, beatmap.id, beatmap.set_id, - beatmap.set_id, # thread_id + beatmap.set_id, # ThreadId ranked, - grades[0], # standard - grades[2], # fruits - grades[1], # taiko - grades[3], # mania + grades[0], # Standard + grades[2], # Fruits + grades[1], # Taiko + grades[3], # Mania beatmap.md5 ) ) @@ -489,7 +503,6 @@ def start_spectating(player: Player, player_id: int): if (player.spectating or player in target.spectators) and not player.is_tourney_client: stop_spectating(player) - # TODO: return here? player.logger.info(f'Started spectating "{target.name}".') player.spectating = target @@ -585,7 +598,6 @@ def invite(player: Player, target_id: int): return # TODO: Check invite spams - target.enqueue_invite( bMessage( player.name, @@ -747,10 +759,11 @@ def leave_match(player: Player): slot = player.match.get_slot(player) assert slot is not None - if slot.status == SlotStatus.Locked: - status = SlotStatus.Locked - else: - status = SlotStatus.Open + status = ( + SlotStatus.Locked + if slot.status == SlotStatus.Locked + else SlotStatus.Open + ) slot.reset(status) @@ -776,12 +789,12 @@ def leave_match(player: Player): player.match.beatmap_name = player.match.previous_beatmap_name if all(slot.empty for slot in player.match.slots): + # No players in match anymore -> Disband match player.enqueue_match_disband(player.match.id) for p in session.players.in_lobby: p.enqueue_match_disband(player.match.id) - # Match is empty session.matches.remove(player.match) player.match.starting = None @@ -800,25 +813,26 @@ def leave_match(player: Player): events.create(match_id, type=EventType.Disband) player.match.logger.info('Match was disbanded.') - else: - if player is player.match.host: - # Player was host, transfer to next player - for slot in player.match.slots: - if slot.status.value & SlotStatus.HasPlayer.value: - player.match.host = slot.player - player.match.host.enqueue_match_transferhost() - - events.create( - player.match.db_match.id, - type=EventType.Host, - data={ - 'previous': {'id': player.id, 'name': player.name}, - 'new': {'id': player.match.host.id, 'name': player.match.host.name} - } - ) + player.match = None + return - player.match.update() + if player is player.match.host: + # Player was host, transfer to next player + for slot in player.match.slots: + if slot.status.value & SlotStatus.HasPlayer.value: + player.match.host = slot.player + player.match.host.enqueue_match_transferhost() + + events.create( + player.match.db_match.id, + type=EventType.Host, + data={ + 'previous': {'id': player.id, 'name': player.name}, + 'new': {'id': player.match.host.id, 'name': player.match.host.name} + } + ) + player.match.update() player.match = None @register(RequestPacket.MATCH_CHANGE_SLOT) @@ -905,7 +919,7 @@ def change_mods(player: Player, mods: Mods): if player.match.freemod: if player is player.match.host: - # Onky keep SpeedMods + # Only keep SpeedMods player.match.mods = mods & Mods.SpeedMods # There is a bug, where DT and NC are enabled at the same time @@ -1007,10 +1021,11 @@ def lock(player: Player, slot_id: int): if slot.has_player: player.match.kick_player(slot.player) - if slot.status == SlotStatus.Locked: - slot.status = SlotStatus.Open - else: - slot.status = SlotStatus.Locked + slot.status = ( + SlotStatus.Open + if slot.status == SlotStatus.Locked + else SlotStatus.Locked + ) player.match.update() diff --git a/app/clients/versions/b20121223/decoder.py b/app/clients/versions/b20121223/decoder.py index 19393856..fe8e7c43 100644 --- a/app/clients/versions/b20121223/decoder.py +++ b/app/clients/versions/b20121223/decoder.py @@ -18,3 +18,11 @@ def wrapper(func) -> Callable: @register(RequestPacket.SEND_MESSAGE) def message(stream: StreamIn): return Reader(stream).read_message() + +@register(RequestPacket.SEND_PRIVATE_MESSAGE) +def private_message(stream: StreamIn): + return Reader(stream).read_message() + +@register(RequestPacket.SET_AWAY_MESSAGE) +def away_message(stream: StreamIn): + return Reader(stream).read_message() diff --git a/app/clients/versions/b20121223/encoder.py b/app/clients/versions/b20121223/encoder.py index aa7dee53..29f85256 100644 --- a/app/clients/versions/b20121223/encoder.py +++ b/app/clients/versions/b20121223/encoder.py @@ -20,3 +20,21 @@ def message(message: bMessage): writer = Writer() writer.write_message(message) return writer.stream.get() + +@register(ResponsePacket.TARGET_IS_SILENCED) +def target_silenced(msg: bMessage): + writer = Writer() + writer.write_message(msg) + return writer.stream.get() + +@register(ResponsePacket.USER_DM_BLOCKED) +def dm_blocked(msg: bMessage): + writer = Writer() + writer.write_message(msg) + return writer.stream.get() + +@register(ResponsePacket.INVITE) +def match_invite(msg: bMessage): + writer = Writer() + writer.write_message(msg) + return writer.stream.get() \ No newline at end of file diff --git a/app/clients/versions/b388/encoder.py b/app/clients/versions/b388/encoder.py index 3fcd5d93..65d3df77 100644 --- a/app/clients/versions/b388/encoder.py +++ b/app/clients/versions/b388/encoder.py @@ -21,6 +21,7 @@ def beatmap_info_reply(reply: bBeatmapInfoReply): for info in reply.beatmaps: # Approved status does not exist info.ranked = { + -1: -1, 0: 0, 1: 1, 2: 1 diff --git a/app/commands.py b/app/commands.py index c272ab12..d0655b27 100644 --- a/app/commands.py +++ b/app/commands.py @@ -4,6 +4,7 @@ from typing import List, NamedTuple, Callable from pytimeparse.timeparse import timeparse from datetime import timedelta, datetime +from twisted.internet import threads from dataclasses import dataclass from threading import Thread @@ -38,13 +39,11 @@ from .objects.channel import Channel from .common.objects import bMessage from .objects.player import Player -from .tcp import TcpBanchoProtocol import timeago import config import random import shlex -import utils import time import app import os @@ -1351,37 +1350,39 @@ def get_command( # Regular commands for command in commands: - if trigger in command.triggers: - has_permissions = any( - group in command.groups - for group in player.groups - ) + if trigger not in command.triggers: + continue - if not has_permissions: - return - - # Try running the command - try: - response = command.callback( - Context( - player, - trigger, - target, - args - ) - ) - except Exception as e: - player.logger.error( - f'Command error: {e}', - exc_info=e - ) + has_permissions = any( + group in command.groups + for group in player.groups + ) - response = ['An error occurred while running this command.'] + if not has_permissions: + return - return CommandResponse( - response, - command.hidden + # Try running the command + try: + response = command.callback( + Context( + player, + trigger, + target, + args + ) ) + except Exception as e: + player.logger.error( + f'Command error: {e}', + exc_info=e + ) + + response = ['An error occurred while running this command.'] + + return CommandResponse( + response, + command.hidden + ) try: set_trigger, trigger, *args = trigger, *args @@ -1394,57 +1395,74 @@ def get_command( continue for command in set.commands: - if trigger in command.triggers: - has_permissions = any( - group in command.groups - for group in player.groups - ) + if trigger not in command.triggers: + continue - if not has_permissions: - continue + has_permissions = any( + group in command.groups + for group in player.groups + ) - ctx = Context( - player, - trigger, - target, - args - ) + if not has_permissions: + continue - # Check set conditions - for condition in set.conditions: - if not condition(ctx): - break - else: - # Try running the command - try: - response = command.callback(ctx) - except Exception as e: - player.logger.error( - f'Command error: {e}', - exc_info=e - ) - - response = ['An error occurred while running this command.'] - - return CommandResponse( - response, - command.hidden + ctx = Context( + player, + trigger, + target, + args + ) + + # Check set conditions + for condition in set.conditions: + if not condition(ctx): + break + + else: + # Try running the command + try: + response = command.callback(ctx) + except Exception as e: + player.logger.error( + f'Command error: {e}', + exc_info=e ) + response = ['An error occurred while running this command.'] + + return CommandResponse( + response, + command.hidden + ) + def execute( player: Player, target: Channel | Player, command_message: str -): +) -> None: if not command_message.startswith('!'): command_message = f'!{command_message}' - command = get_command( + threads.deferToThread( + get_command, player, target, command_message + ).addCallback( + lambda result: on_command_done( + result, + player, + target, + command_message + ) ) +def on_command_done( + command: CommandResponse, + player: Player, + target: Channel | Player, + command_message: str +) -> None: if not command: return diff --git a/app/common b/app/common index 2962bb74..02bbad76 160000 --- a/app/common +++ b/app/common @@ -1 +1 @@ -Subproject commit 2962bb74aef036f1c4c33ae7a34f76e1f2cfc231 +Subproject commit 02bbad76a94159d392a460b4a5023757266b1074 diff --git a/app/http.py b/app/http.py index 54a94f85..47af1a09 100644 --- a/app/http.py +++ b/app/http.py @@ -76,14 +76,14 @@ def handle_login_request(self, request: Request) -> bytes: request.setResponseCode(403) return b'' - username, password, client_data = ( - request.content.read().decode().splitlines() - ) + try: + username, password, client_data = ( + request.content.read().decode().splitlines() + ) - ip_address = ip.resolve_ip_address_twisted(request) - client = OsuClient.from_string(client_data, ip_address) + ip_address = ip.resolve_ip_address_twisted(request) + client = OsuClient.from_string(client_data, ip_address) - try: self.player = HttpPlayer( ip_address, request.getClientAddress().port @@ -148,7 +148,7 @@ def render_POST(self, request: Request) -> bytes: return self.handle_login_request(request) if not (player := app.session.players.by_token(osu_token)): - # Tell client to reconnect immediately + # Tell client to reconnect immediately (restart packet) return b'\x56\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00' self.player = player diff --git a/app/jobs/__init__.py b/app/jobs/__init__.py deleted file mode 100644 index 1924f49f..00000000 --- a/app/jobs/__init__.py +++ /dev/null @@ -1,44 +0,0 @@ - -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures._base import Future -from typing import Any, Callable, Tuple - -from . import events -from . import pings - -import logging -import time - -class Jobs(ThreadPoolExecutor): - def __init__(self, max_workers = None, thread_name_prefix: str = "job", initializer = None, initargs: Tuple[Any, ...] = ...) -> None: - super().__init__(max_workers, thread_name_prefix, initializer, initargs) - - self.logger = logging.getLogger('anchor') - - def submit(self, fn: Callable, *args, **kwargs) -> Future: - future = super().submit(fn, *args, **kwargs) - future.add_done_callback(self.__future_callback) - self.logger.info(f' - Starting job: "{fn.__name__}"') - return future - - def sleep(self, seconds: int): - for _ in range(int(seconds)): - if self._shutdown: - # Exit thread - exit() - - time.sleep(1) - - def __future_callback(self, future: Future): - if e := future.exception(): - if isinstance(e, SystemExit): - return - - self.logger.error( - f'Future {future.__repr__()} raised an exception: {e}', - exc_info=e - ) - - self.logger.debug( - f'Result for job {future.__repr__()}: {future.result()}' - ) diff --git a/app/objects/channel.py b/app/objects/channel.py index 5d4442a4..a4cd8fa7 100644 --- a/app/objects/channel.py +++ b/app/objects/channel.py @@ -1,8 +1,14 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.objects.player import Player + from app.common.database.repositories import messages from app.common.constants.strings import BAD_WORDS from app.common.objects import bMessage, bChannel from app.common.constants import Permissions +from app.objects import collections from app.common import officer import logging @@ -29,10 +35,7 @@ def __init__( self.public = public self.logger = logging.getLogger(self.name) - - from .collections import Players - - self.users = Players() + self.users = collections.Players() def __repr__(self) -> str: return f'<{self.name} - {self.topic}>' @@ -76,6 +79,7 @@ def can_write(self, perms: Permissions): def update(self): if not self.public: + # Only enqueue to users in this channel for player in self.users: player.enqueue_channel( self.bancho_channel, @@ -90,7 +94,7 @@ def update(self): autojoin=False ) - def add(self, player, no_response: bool = False): + def add(self, player: "Player", no_response: bool = False) -> None: # Update player's silence duration player.silenced @@ -114,11 +118,8 @@ def add(self, player, no_response: bool = False): self.logger.info(f'{player.name} joined') - def remove(self, player) -> None: - try: - self.users.remove(player) - except ValueError: - pass + def remove(self, player: "Player") -> None: + self.users.remove(player) if self in player.channels: player.channels.remove(self) @@ -127,7 +128,7 @@ def remove(self, player) -> None: def send_message( self, - sender, + sender: "Player", message: str, ignore_privs=False, exclude_sender=True, @@ -224,5 +225,5 @@ def send_message( messages.create( sender.name, self.display_name, - message + message[:512] ) diff --git a/app/objects/client.py b/app/objects/client.py index 887b4013..20b42319 100644 --- a/app/objects/client.py +++ b/app/objects/client.py @@ -6,7 +6,6 @@ from datetime import datetime import hashlib -import utils import pytz import re @@ -130,9 +129,9 @@ def __init__( self.ip = ip @classmethod - def from_string(cls, line: str, ip: str): + def from_string(cls, line: str, ip: str) -> "OsuClient": if len(args := line.split('|')) < 2: - return + return OsuClient.empty() # Sent in every client version build_version = args[0] @@ -168,10 +167,10 @@ def from_string(cls, line: str, ip: str): ) @classmethod - def empty(cls): + def empty(cls) -> "OsuClient": return OsuClient( location.fetch_geolocation('127.0.0.1'), - ClientVersion(OSU_VERSION.match('b1337'), 1337), + ClientVersion(OSU_VERSION.match('b20136969'), 20136969), ClientHash('', '', '', '', ''), 0, True, diff --git a/app/objects/multiplayer.py b/app/objects/multiplayer.py index 9dad6e14..b8d1a0fc 100644 --- a/app/objects/multiplayer.py +++ b/app/objects/multiplayer.py @@ -390,7 +390,6 @@ def change_settings(self, new_match: bMatch): if self.mode != new_match.mode: self.mode = new_match.mode self.logger.info(f'Mode: {self.mode.formatted}') - # TODO: Check osu! mania support if self.name != new_match.name: self.name = new_match.name @@ -501,8 +500,6 @@ def start(self): if slot.status == SlotStatus.NotReady: continue - # TODO: Check osu! mania support - slot.player.enqueue_match_start(self.bancho_match) if slot.status != SlotStatus.NoMap: diff --git a/app/objects/player.py b/app/objects/player.py index 9ca9cae9..a5b1a4e9 100644 --- a/app/objects/player.py +++ b/app/objects/player.py @@ -148,24 +148,32 @@ def bot_player(cls): @property def is_bot(self) -> bool: - return self.object.is_bot if self.object else False + return ( + self.object.is_bot + if self.object else False + ) @property def silenced(self) -> bool: - if self.object.silence_end: - if self.remaining_silence > 0: - return True - else: - # User is not silenced anymore - self.unsilence() - return False - return False + if not self.object.silence_end: + return False + + if self.remaining_silence < 0: + # User is not silenced anymore + self.unsilence() + return False + + return True @property def remaining_silence(self) -> int: - if self.object.silence_end: - return self.object.silence_end.timestamp() - datetime.now().timestamp() - return 0 + if not self.object.silence_end: + return 0 + + return ( + self.object.silence_end.timestamp() - + datetime.now().timestamp() + ) @property def supporter(self) -> bool: @@ -197,11 +205,13 @@ def restricted(self) -> bool: @property def current_stats(self) -> DBStats | None: - for stats in self.stats: - if stats.mode == self.status.mode.value: - return stats - self.logger.warning('Failed to load current stats!') - return None + return next( + ( + stats for stats in self.stats + if stats.mode == self.status.mode.value + ), + None + ) @property def friends(self) -> List[int]: @@ -387,9 +397,6 @@ def connectionLost(self, reason: Failure = Failure(ConnectionDone())): app.clients.handler.leave_match(self) def send_packet(self, packet: Enum, *args) -> None: - if self.is_bot: - return - try: stream = StreamOut() data = self.encoders[packet](*args) @@ -682,14 +689,16 @@ def login_success(self): # Enqueue all public channels for channel in app.session.channels.public: - if channel.can_read(self.permissions): - # Check if channel should be autojoined - if channel.name in config.AUTOJOIN_CHANNELS: - self.enqueue_channel(channel, autojoin=True) - channel.add(self) - continue + if not channel.can_read(self.permissions): + continue - self.enqueue_channel(channel) + # Check if channel should be autojoined + if channel.name in config.AUTOJOIN_CHANNELS: + self.enqueue_channel(channel, autojoin=True) + channel.add(self) + continue + + self.enqueue_channel(channel) self.send_packet(self.packets.CHANNEL_INFO_COMPLETE) @@ -767,9 +776,6 @@ def check_client(self, session: Session | None = None): self.enqueue_announcement(strings.MULTIACCOUNTING_DETECTED) def packet_received(self, packet_id: int, stream: StreamIn): - if self.is_bot: - return - self.last_response = time.time() try: @@ -800,8 +806,9 @@ def packet_received(self, packet_id: int, stream: StreamIn): if args != None: handler_function(self, args) - else: - handler_function(self) + return + + handler_function(self) def silence(self, duration_sec: int, reason: str | None = None): if self.is_bot: @@ -1256,8 +1263,11 @@ def enqueue_announcement(self, message: str): message ) + def enqueue_server_restart(self, retry_ms: int): + self.send_packet( + self.packets.RESTART, + retry_ms + ) + def enqueue_monitor(self): self.send_packet(self.packets.MONITOR) - - def enqueue_server_restart(self, retry_ms: int): - self.send_packet(self.packets.RESTART, retry_ms) diff --git a/app/session.py b/app/session.py index 5f534c60..9d1d70a2 100644 --- a/app/session.py +++ b/app/session.py @@ -4,7 +4,7 @@ from .clients import DefaultResponsePacket from .common.database import Postgres from .common.storage import Storage -from .jobs import Jobs +from .tasks import Tasks from typing import Callable, Dict from requests import Session @@ -44,4 +44,4 @@ storage = Storage() players = Players() matches = Matches() -jobs = Jobs() +tasks = Tasks() diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 00000000..2e2f0612 --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,46 @@ + +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures._base import Future +from typing import Callable + +from . import activities +from . import events +from . import pings + +import logging +import time + +class Tasks(ThreadPoolExecutor): + def __init__(self) -> None: + super().__init__(thread_name_prefix='task') + self.logger = logging.getLogger('anchor') + + def submit(self, fn: Callable, *args, **kwargs) -> Future: + """Submit a task to the threadpool""" + future = super().submit(fn, *args, **kwargs) + future.add_done_callback(self.future_callback) + self.logger.info(f' - Starting task: "{fn.__name__}"') + return future + + def sleep(self, seconds: int, interval: int = 1): + """Custom sleep function to check for application shutdowns""" + for _ in range(0, seconds, interval): + if self._shutdown: + exit() + + time.sleep(interval) + + def future_callback(self, future: Future): + """Callback function for a task/future""" + if e := future.exception(): + if isinstance(e, SystemExit): + return + + self.logger.error( + f'Future {future.__repr__()} raised an exception: {e}', + exc_info=e + ) + + self.logger.debug( + f'Result for task {future.__repr__()}: {future.result()}' + ) diff --git a/app/jobs/activities.py b/app/tasks/activities.py similarity index 77% rename from app/jobs/activities.py rename to app/tasks/activities.py index 87316547..562174d0 100644 --- a/app/jobs/activities.py +++ b/app/tasks/activities.py @@ -6,9 +6,9 @@ MATCH_TIMEOUT_SECONDS = MATCH_TIMEOUT_MINUTES * 60 def match_activity(): - """This job will close any matches that have not been active in the last 15 minutes.""" + """This task will close any matches that have not been active in the last 15 minutes.""" while True: - if app.session.jobs._shutdown: + if app.session.tasks._shutdown: exit() for match in app.session.matches.active: @@ -23,4 +23,4 @@ def match_activity(): ) match.close() - app.session.jobs.sleep(5) + app.session.tasks.sleep(5) diff --git a/app/jobs/events.py b/app/tasks/events.py similarity index 87% rename from app/jobs/events.py rename to app/tasks/events.py index bc6cba2f..12e6c2b5 100644 --- a/app/jobs/events.py +++ b/app/tasks/events.py @@ -5,7 +5,7 @@ def event_listener(): """This will listen for redis pubsub events and call the appropriate functions.""" events = app.session.events.listen() - if app.session.jobs._shutdown: + if app.session.tasks._shutdown: exit() for func, args, kwargs in events: diff --git a/app/jobs/pings.py b/app/tasks/pings.py similarity index 84% rename from app/jobs/pings.py rename to app/tasks/pings.py index a08697d9..353823d0 100644 --- a/app/jobs/pings.py +++ b/app/tasks/pings.py @@ -8,7 +8,7 @@ def ping(): """ - This job will handle client pings and timeouts. Pings are required for tcp clients, to keep them connected. + This task will handle client pings and timeouts. Pings are required for tcp clients, to keep them connected. For http clients, we can just check if they have responded within the timeout period, and close the connection if not. """ next_ping = (time.time() - PING_INTERVAL) @@ -40,13 +40,13 @@ def ping(): player.logger.warning('Client timed out!') player.close_connection() -def ping_job(): +def ping_task(): while True: - if app.session.jobs._shutdown: + if app.session.tasks._shutdown: exit() try: ping() time.sleep(1) except Exception as e: - officer.call(f'Ping job failed: {e}', exc_info=e) + officer.call(f'Ping task failed: {e}', exc_info=e) diff --git a/app/tcp.py b/app/tcp.py index 2847fd5e..ed483b31 100644 --- a/app/tcp.py +++ b/app/tcp.py @@ -57,13 +57,6 @@ def enqueue(self, data: bytes): exc_info=e ) - def send_web_response(self): - self.enqueue('\r\n'.join([ - 'HTTP/1.1 200 OK', - 'content-type: text/html', - ANCHOR_WEB_RESPONSE - ]).encode()) - def close_connection(self, error: Exception | None = None): if not self.is_local or config.DEBUG: if error: @@ -89,12 +82,6 @@ def dataReceived(self, data: bytes): self.buffer += data.replace(b'\r', b'') self.busy = True - if data.startswith(b'GET /'): - # We received a web request - self.send_web_response() - self.close_connection() - return - if self.buffer.count(b'\n') < 3: return diff --git a/config.py b/config.py index fc03e146..74a9da8f 100644 --- a/config.py +++ b/config.py @@ -53,4 +53,4 @@ DATA_PATH = os.path.abspath('.data') MULTIPLAYER_MAX_SLOTS = 8 PROTOCOL_VERSION = 18 -VERSION = '1.2.10' +VERSION = '1.3.0' diff --git a/main.py b/main.py index 9977a9fd..b934639c 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ from twisted.internet import reactor -from app.common.database.repositories import channels +from app.common.database.repositories import channels, wrapper from app.common.cache import status, usercount from app.server import TcpBanchoFactory, HttpBanchoFactory @@ -10,7 +10,7 @@ from app.common.logging import Console, File from app.objects.channel import Channel from app.objects.player import Player -from app.jobs import ( +from app.tasks import ( activities, events, pings @@ -24,9 +24,11 @@ logging.basicConfig( handlers=[Console, File], - level=logging.DEBUG + level=( + logging.DEBUG if config.DEBUG else logging.INFO + ) ) def setup(): @@ -57,10 +59,10 @@ def setup(): app.session.bot_player = bot_player app.session.logger.info(f' - {bot_player.name}') - app.session.logger.info('Loading jobs...') - app.session.jobs.submit(pings.ping_job) - app.session.jobs.submit(events.event_listener) - app.session.jobs.submit(activities.match_activity) + app.session.logger.info('Loading tasks...') + app.session.tasks.submit(pings.ping_task) + app.session.tasks.submit(events.event_listener) + app.session.tasks.submit(activities.match_activity) # Reset usercount usercount.set(0) @@ -72,6 +74,8 @@ def setup(): def before_shutdown(*args): for player in app.session.players: + # Enqueue server restart packet to all players + # They should reconnect after 15 seconds player.enqueue_server_restart(15 * 1000) reactor.callLater(0.5, reactor.stop) @@ -86,29 +90,32 @@ def shutdown(): status.delete(player.id) app.session.events.submit('shutdown') - app.session.jobs.shutdown(cancel_futures=True, wait=False) + app.session.tasks.shutdown(cancel_futures=True, wait=False) - def force_exit(signal, frame): + def force_exit(*args): app.session.logger.warning("Force exiting...") os._exit(0) signal.signal(signal.SIGINT, force_exit) -def main(): - try: - http_factory = HttpBanchoFactory() - tcp_factory = TcpBanchoFactory() +def on_startup_fail(e: Exception): + app.session.logger.fatal(f'Failed to start server: "{e}"') + reactor.stop() - reactor.suggestThreadPoolSize(config.BANCHO_WORKERS) - reactor.listenTCP(config.HTTP_PORT, http_factory) +@wrapper.exception_wrapper(on_startup_fail) +def setup_servers(): + http_factory = HttpBanchoFactory() + tcp_factory = TcpBanchoFactory() - for port in config.TCP_PORTS: - reactor.listenTCP(port, tcp_factory) - except Exception as e: - app.session.logger.error(f'Failed to start server: "{e}"') - exit(1) + reactor.suggestThreadPoolSize(config.BANCHO_WORKERS) + reactor.listenTCP(config.HTTP_PORT, http_factory) + for port in config.TCP_PORTS: + reactor.listenTCP(port, tcp_factory) + +def main(): reactor.addSystemEventTrigger('before', 'startup', setup) + reactor.addSystemEventTrigger('before', 'startup', setup_servers) reactor.addSystemEventTrigger('after', 'shutdown', shutdown) reactor.run() diff --git a/requirements.txt b/requirements.txt index 25ae9d64..f1cffc62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ twisted==24.3.0 redis==5.0.4 requests==2.32.0 boto3-type-annotations-with-docs==0.3.1 -boto3==1.34.98 +boto3==1.34.108 pytz==2024.1 pytimeparse==1.1.8 geoip2==4.8.0 diff --git a/utils.py b/utils.py deleted file mode 100644 index 7b5f09ec..00000000 --- a/utils.py +++ /dev/null @@ -1,12 +0,0 @@ - -from twisted.python.failure import Failure -from twisted.web.http import Request - -import config -import app - -def thread_callback(error: Failure): - app.session.logger.error( - f'Failed to execute thread: {error.__str__()} ({error.getErrorMessage()})', - exc_info=error.value - )