diff --git a/docs/tests/protocols/index.rst b/docs/tests/protocols/index.rst index f399557..ee7b40c 100644 --- a/docs/tests/protocols/index.rst +++ b/docs/tests/protocols/index.rst @@ -4,45 +4,47 @@ Protocols Tests =============== .. toctree:: - test_ase/index - test_avp2/index - test_battlefield/index - test_battlefield2/index - test_cod1/index test_cod4/index - test_doom3/index - test_eldewrito/index - test_eos/index - test_fivem/index + test_halo1/index test_flatout2/index - test_gamespy1/index + test_battlefield2/index + test_ssc/index + test_source/index + test_won/index + test_fivem/index test_gamespy2/index + test_nadeo/index + test_trackmania_nations/index + test_ut3/index + test_eldewrito/index + test_eos/index + test_renegadex/index + test_stronghold_ce/index + test_quake2/index test_gamespy3/index - test_gamespy4/index - test_halo1/index + test_stronghold_crusader/index + test_supcom/index test_kaillera/index + test_toxikk/index + test_avp2/index + test_gamespy1/index + test_scum/index + test_raknet/index test_killingfloor/index - test_minecraft/index - test_nadeo/index + test_battlefield/index test_palworld/index - test_quake1/index - test_quake2/index - test_quake3/index - test_raknet/index - test_renegadex/index + test_doom3/index + test_w40kdow/index test_samp/index - test_satisfactory/index - test_scum/index - test_source/index - test_ssc/index - test_stronghold_ce/index - test_stronghold_crusader/index + test_ase/index test_teamspeak3/index - test_toxikk/index - test_trackmania_nations/index - test_unreal2/index - test_ut3/index test_vcmp/index - test_w40kdow/index + test_minecraft/index + test_quake3/index test_warcraft3/index - test_won/index + test_quake1/index + test_jediknight/index + test_unreal2/index + test_gamespy4/index + test_cod1/index + test_satisfactory/index diff --git a/docs/tests/protocols/test_jediknight/index.rst b/docs/tests/protocols/test_jediknight/index.rst new file mode 100644 index 0000000..21d68f3 --- /dev/null +++ b/docs/tests/protocols/test_jediknight/index.rst @@ -0,0 +1,10 @@ +.. _test_jediknight: + +test_jediknight +=============== + +.. toctree:: + test_get_status + test_get_info + test_protocol_properties + test_get_full_status diff --git a/docs/tests/protocols/test_jediknight/test_get_full_status.rst b/docs/tests/protocols/test_jediknight/test_get_full_status.rst new file mode 100644 index 0000000..97b5a4e --- /dev/null +++ b/docs/tests/protocols/test_jediknight/test_get_full_status.rst @@ -0,0 +1,75 @@ +test_get_full_status +==================== + +Here are the results for the test method. + +.. code-block:: json + + { + "info": { + "fdisable": "0", + "wdisable": "0", + "truejedi": "0", + "needpass": "0", + "gametype": "0", + "sv_maxclients": "8", + "clients": "1", + "mapname": "mp/ffa1", + "hostname": "*Jedi*", + "protocol": "26", + "challenge": "xxx", + "gametype_translated": "Free For All" + }, + "status": { + "g_siegeTeam2": "none", + "g_siegeTeam1": "none", + "g_siegeRespawn": "20", + "g_weaponDisable": "0", + "g_forcePowerDisable": "0", + "g_forceRegenTime": "200", + "g_jediVmerc": "0", + "g_maxGameClients": "0", + "sv_maxclients": "8", + "g_duelWeaponDisable": "524279", + "g_forceBasedTeams": "0", + "duel_fraglimit": "10", + "g_maxForceRank": "6", + "g_saberLocking": "1", + "g_privateDuel": "1", + "timelimit": "0", + "fraglimit": "1", + "dmflags": "0", + "g_siegeTeamSwitch": "1", + "sv_floodProtect": "1", + "sv_maxPing": "0", + "sv_minPing": "0", + "sv_maxRate": "0", + "sv_hostname": "*Jedi*", + "capturelimit": "0", + "version": "JAmp: v1.0.1.0 win-x86 Oct 24 2003", + "g_maxHolocronCarry": "3", + "g_gametype": "0", + "g_needpass": "0", + "protocol": "26", + "mapname": "mp/ffa1", + "sv_privateClients": "0", + "sv_allowDownload": "0", + "bot_minplayers": "0", + "g_debugMelee": "0", + "g_stepSlideFix": "1", + "g_noSpecMove": "0", + "gamename": "basejka", + "g_allowNPC": "1", + "g_saberWallDamageScale": "0.4", + "bg_fighterAltControl": "0", + "g_showDuelHealths": "0", + "players": [ + { + "score": 0, + "ping": 0, + "name": "Padawan" + } + ], + "g_gametype_translated": "Free For All" + } + } diff --git a/docs/tests/protocols/test_jediknight/test_get_info.rst b/docs/tests/protocols/test_jediknight/test_get_info.rst new file mode 100644 index 0000000..6076a11 --- /dev/null +++ b/docs/tests/protocols/test_jediknight/test_get_info.rst @@ -0,0 +1,21 @@ +test_get_info +============= + +Here are the results for the test method. + +.. code-block:: json + + { + "fdisable": "0", + "wdisable": "0", + "truejedi": "0", + "needpass": "0", + "gametype": "0", + "sv_maxclients": "8", + "clients": "1", + "mapname": "mp/ffa1", + "hostname": "*Jedi*", + "protocol": "26", + "challenge": "xxx", + "gametype_translated": "Free For All" + } diff --git a/docs/tests/protocols/test_jediknight/test_get_status.rst b/docs/tests/protocols/test_jediknight/test_get_status.rst new file mode 100644 index 0000000..c6b871b --- /dev/null +++ b/docs/tests/protocols/test_jediknight/test_get_status.rst @@ -0,0 +1,59 @@ +test_get_status +=============== + +Here are the results for the test method. + +.. code-block:: json + + { + "g_siegeTeam2": "none", + "g_siegeTeam1": "none", + "g_siegeRespawn": "20", + "g_weaponDisable": "0", + "g_forcePowerDisable": "0", + "g_forceRegenTime": "200", + "g_jediVmerc": "0", + "g_maxGameClients": "0", + "sv_maxclients": "8", + "g_duelWeaponDisable": "524279", + "g_forceBasedTeams": "0", + "duel_fraglimit": "10", + "g_maxForceRank": "6", + "g_saberLocking": "1", + "g_privateDuel": "1", + "timelimit": "0", + "fraglimit": "1", + "dmflags": "0", + "g_siegeTeamSwitch": "1", + "sv_floodProtect": "1", + "sv_maxPing": "0", + "sv_minPing": "0", + "sv_maxRate": "0", + "sv_hostname": "*Jedi*", + "capturelimit": "0", + "version": "JAmp: v1.0.1.0 win-x86 Oct 24 2003", + "g_maxHolocronCarry": "3", + "g_gametype": "0", + "g_needpass": "0", + "protocol": "26", + "mapname": "mp/ffa1", + "sv_privateClients": "0", + "sv_allowDownload": "0", + "bot_minplayers": "0", + "g_debugMelee": "0", + "g_stepSlideFix": "1", + "g_noSpecMove": "0", + "gamename": "basejka", + "g_allowNPC": "1", + "g_saberWallDamageScale": "0.4", + "bg_fighterAltControl": "0", + "g_showDuelHealths": "0", + "players": [ + { + "score": 0, + "ping": 0, + "name": "Padawan" + } + ], + "g_gametype_translated": "Free For All" + } diff --git a/docs/tests/protocols/test_jediknight/test_protocol_properties.rst b/docs/tests/protocols/test_jediknight/test_protocol_properties.rst new file mode 100644 index 0000000..23da317 --- /dev/null +++ b/docs/tests/protocols/test_jediknight/test_protocol_properties.rst @@ -0,0 +1,14 @@ +test_protocol_properties +======================== + +Here are the results for the test method. + +.. code-block:: json + + { + "_host": "172.29.100.29", + "_port": 29070, + "_timeout": 5.0, + "_allow_broadcast": false, + "_source_port": 29070 + } diff --git a/docs/tests/protocols/test_supcom/index.rst b/docs/tests/protocols/test_supcom/index.rst new file mode 100644 index 0000000..af38e08 --- /dev/null +++ b/docs/tests/protocols/test_supcom/index.rst @@ -0,0 +1,7 @@ +.. _test_supcom: + +test_supcom +=========== + +.. toctree:: + test_get_status diff --git a/docs/tests/protocols/test_supcom/test_get_status.rst b/docs/tests/protocols/test_supcom/test_get_status.rst new file mode 100644 index 0000000..c2072d1 --- /dev/null +++ b/docs/tests/protocols/test_supcom/test_get_status.rst @@ -0,0 +1,92 @@ +test_get_status +=============== + +Here are the results for the test method. + +.. code-block:: json + + { + "game_name": "Banane", + "hosted_by": "Test", + "product_code": "SC1", + "scenario_file": "/maps/scmp_034/scmp_034_scenario.lua", + "num_players": 2, + "max_players": 4, + "map_width": 512, + "map_height": 512, + "map_name_lookup": "High Noon", + "game_speed": "normal", + "victory_condition": "demoralization", + "fog_of_war": "explored", + "unit_cap": "500", + "cheats_enabled": false, + "team_lock": "locked", + "team_spawn": "random", + "allow_observers": true, + "no_rush_option": "Off", + "prebuilt_units": "Off", + "civilian_alliance": "enemy", + "timeouts": "3", + "options": { + "HostedBy": "Test", + "TeamLock": "locked", + "CheatsEnabled": "false", + "AllowObservers": true, + "Victory": "demoralization", + "PrebuiltUnits": "Off", + "CivilianAlliance": "enemy", + "Timeouts": "3", + "NoRushOption": "Off", + "TeamSpawn": "random", + "ScenarioFile": "/maps/scmp_034/scmp_034_scenario.lua", + "UnitCap": "500", + "GameSpeed": "normal", + "FogOfWar": "explored", + "GameName": "Banane", + "ProductCode": "SC1", + "PlayerCount": 2 + }, + "raw": { + "parsed_data": { + "HostedBy": "Test", + "TeamLock": "locked", + "CheatsEnabled": "false", + "AllowObservers": true, + "Victory": "demoralization", + "PrebuiltUnits": "Off", + "CivilianAlliance": "enemy", + "Timeouts": "3", + "NoRushOption": "Off", + "TeamSpawn": "random", + "ScenarioFile": "/maps/scmp_034/scmp_034_scenario.lua", + "UnitCap": "500", + "GameSpeed": "normal", + "FogOfWar": "explored", + "GameName": "Banane", + "ProductCode": "SC1", + "PlayerCount": 2 + }, + "options": {}, + "all_data": { + "HostedBy": "Test", + "TeamLock": "locked", + "CheatsEnabled": "false", + "AllowObservers": true, + "Victory": "demoralization", + "PrebuiltUnits": "Off", + "CivilianAlliance": "enemy", + "Timeouts": "3", + "NoRushOption": "Off", + "TeamSpawn": "random", + "ScenarioFile": "/maps/scmp_034/scmp_034_scenario.lua", + "UnitCap": "500", + "GameSpeed": "normal", + "FogOfWar": "explored", + "GameName": "Banane", + "ProductCode": "SC1", + "PlayerCount": 2 + }, + "response_length": 376, + "raw_hex": "6f78010b010002ccf20401486f73746564427900015465737400014f7074696f6e730004015465616d4c6f636b00016c6f636b65640001436865617473456e61626c6564000166616c73650001416c6c6f774f627365727665727300030101566963746f7279000164656d6f72616c697a6174696f6e00015072656275696c74556e69747300014f66660001436976696c69616e416c6c69616e63650001656e656d79000154696d656f75747300013300014e6f527573684f7074696f6e00014f666600015465616d537061776e000172616e646f6d00015363656e6172696f46696c6500012f6d6170732f73636d705f3033342f73636d705f3033345f7363656e6172696f2e6c75610001556e69744361700001353030000147616d65537065656400016e6f726d616c0001466f674f6657617200016578706c6f72656400050147616d654e616d65000142616e616e65000150726f64756374436f646500015343310001506c61796572436f756e7400000000004005" + } + } diff --git a/opengsq/protocols/__init__.py b/opengsq/protocols/__init__.py index cb054dc..5a28c80 100644 --- a/opengsq/protocols/__init__.py +++ b/opengsq/protocols/__init__.py @@ -18,6 +18,7 @@ from opengsq.protocols.gamespy3 import GameSpy3 from opengsq.protocols.gamespy4 import GameSpy4 from opengsq.protocols.halo1 import Halo1 +from opengsq.protocols.jediknight import JediKnight from opengsq.protocols.kaillera import Kaillera from opengsq.protocols.killingfloor import KillingFloor from opengsq.protocols.minecraft import Minecraft @@ -33,6 +34,7 @@ from opengsq.protocols.scum import Scum from opengsq.protocols.source import Source from opengsq.protocols.ssc import SSC +from opengsq.protocols.supcom import SupCom from opengsq.protocols.stronghold_ce import StrongholdCE from opengsq.protocols.stronghold_crusader import StrongholdCrusader from opengsq.protocols.teamspeak3 import TeamSpeak3 diff --git a/opengsq/protocols/jediknight.py b/opengsq/protocols/jediknight.py new file mode 100644 index 0000000..77ea225 --- /dev/null +++ b/opengsq/protocols/jediknight.py @@ -0,0 +1,231 @@ +from __future__ import annotations + +import re +from opengsq.binary_reader import BinaryReader +from opengsq.exceptions import InvalidPacketException +from opengsq.protocol_base import ProtocolBase +from opengsq.protocol_socket import UdpClient +from opengsq.responses.jediknight import Info, Status, JediKnightStatus +from opengsq.responses.jediknight.status import Player + + +class JediKnight(ProtocolBase): + """ + This class represents the Star Wars Jedi Knight - Jedi Academy Protocol. + It provides methods to interact with Jedi Academy servers. + """ + + full_name = "Star Wars Jedi Knight - Jedi Academy Protocol" + + def __init__(self, host: str, port: int = 29070, timeout: float = 5.0): + """ + Initializes the JediKnight object with the given parameters. + + :param host: The host of the server. + :param port: The port of the server (default: 29070). + :param timeout: The timeout for the server connection. + """ + super().__init__(host, port, timeout) + self._source_port = 29070 # Jedi Academy requires source port 29070 + + async def get_info(self, challenge: str = "xxx") -> Info: + """ + Asynchronously retrieves the server information. + + :param challenge: The challenge string to send (default: "xxx"). + :return: An Info object containing the server information. + """ + # Construct the getinfo payload: ffffffff676574696e666f20787878 + payload = b"\xFF\xFF\xFF\xFF" + b"getinfo " + challenge.encode('ascii') + + response_data = await UdpClient.communicate(self, payload, source_port=self._source_port) + + # Parse the response + br = BinaryReader(response_data) + + # Skip the header (4 bytes of 0xFF) + header = br.read_bytes(4) + if header != b"\xFF\xFF\xFF\xFF": + raise InvalidPacketException( + f"Invalid packet header. Expected: \\xFF\\xFF\\xFF\\xFF. Received: {header.hex()}" + ) + + # Read the response type + response_type = br.read_string([b'\n']) + if response_type != "infoResponse": + raise InvalidPacketException( + f"Unexpected response type. Expected: infoResponse. Received: {response_type}" + ) + + # Parse the key-value pairs + info_data = self._parse_key_value_pairs(br) + + return Info(info_data) + + async def get_status(self) -> Status: + """ + Asynchronously retrieves the server status. + + :return: A Status object containing the server status. + """ + # Construct the getstatus payload: ffffffff676574737461747573 + payload = b"\xFF\xFF\xFF\xFF" + b"getstatus" + + response_data = await UdpClient.communicate(self, payload, source_port=self._source_port) + + # Parse the response + br = BinaryReader(response_data) + + # Skip the header (4 bytes of 0xFF) + header = br.read_bytes(4) + if header != b"\xFF\xFF\xFF\xFF": + raise InvalidPacketException( + f"Invalid packet header. Expected: \\xFF\\xFF\\xFF\\xFF. Received: {header.hex()}" + ) + + # Read the response type + response_type = br.read_string([b'\n']) + if response_type != "statusResponse": + raise InvalidPacketException( + f"Unexpected response type. Expected: statusResponse. Received: {response_type}" + ) + + # Parse the key-value pairs and players + status_data, players = self._parse_status_response(br) + + return Status(status_data, players) + + async def get_full_status(self, challenge: str = "xxx") -> JediKnightStatus: + """ + Asynchronously retrieves both server info and status. + + :param challenge: The challenge string to send (default: "xxx"). + :return: A JediKnightStatus object containing both info and status. + """ + import asyncio + + # Add a small delay between requests to avoid socket conflicts + info = await self.get_info(challenge) + await asyncio.sleep(0.1) # 100ms delay + status = await self.get_status() + + return JediKnightStatus(info=info, status=status) + + def _parse_key_value_pairs(self, br: BinaryReader) -> dict[str, str]: + """ + Parses key-value pairs from the binary reader. + Jedi Academy uses backslash ( \\ ) as delimiter between keys and values. + + :param br: The BinaryReader object to parse from. + :return: A dictionary containing the parsed key-value pairs. + """ + data = {} + + # Read the remaining data as string + remaining_data = br.read().decode('ascii', errors='ignore') + + # Split by backslash and process pairs + parts = remaining_data.split('\\') + + # Remove empty first element if it exists (starts with \) + if parts and parts[0] == '': + parts = parts[1:] + + # Process pairs (key, value, key, value, ...) + for i in range(0, len(parts) - 1, 2): + if i + 1 < len(parts): + key = parts[i].strip() + value = parts[i + 1].strip() + if key: # Only add non-empty keys + data[key] = value + + return data + + def _parse_status_response(self, br: BinaryReader) -> tuple[dict[str, str], list[Player]]: + """ + Parses the status response which contains key-value pairs followed by player info. + Player info format: score ping "name" + + :param br: The BinaryReader object to parse from. + :return: A tuple of (status_data dict, players list). + """ + data = {} + players = [] + + # Read the remaining data as string + remaining_data = br.read().decode('ascii', errors='ignore') + + # Split by newline to separate server info from player info + lines = remaining_data.split('\n') + + if lines: + # First line contains server info (key-value pairs) + server_info = lines[0] + parts = server_info.split('\\') + + # Remove empty first element if it exists (starts with \) + if parts and parts[0] == '': + parts = parts[1:] + + # Process pairs (key, value, key, value, ...) + for i in range(0, len(parts) - 1, 2): + if i + 1 < len(parts): + key = parts[i].strip() + value = parts[i + 1].strip() + if key: # Only add non-empty keys + data[key] = value + + # Parse player lines (format: score ping "name") + player_pattern = re.compile(r'^(-?\d+)\s+(-?\d+)\s+"([^"]*)"') + for line in lines[1:]: + line = line.strip() + if not line: + continue + match = player_pattern.match(line) + if match: + players.append(Player( + score=int(match.group(1)), + ping=int(match.group(2)), + name=match.group(3) + )) + + return data, players + + +if __name__ == "__main__": + import asyncio + + async def main_async(): + # Test with a Jedi Academy server + jk = JediKnight(host="127.0.0.1", port=29070, timeout=5.0) + + try: + print("Getting server info...") + info = await jk.get_info() + print(f"Info: {info}") + print(f"Hostname: {info.hostname}") + print(f"Map: {info.mapname}") + print(f"Gametype: {info.gametype} ({info.gametype_translated})") + print(f"Players: {info.clients}/{info.sv_maxclients}") + + print("\n" + "="*50) + print("Getting server status...") + await asyncio.sleep(0.2) # Wait a bit before next request + status = await jk.get_status() + print(f"Status: {status}") + print(f"Server Name: {status.sv_hostname}") + print(f"Version: {status.version}") + print(f"Game: {status.gamename}") + print(f"Gametype: {status.g_gametype} ({status.g_gametype_translated})") + print(f"Players ({len(status.players)}):") + for player in status.players: + print(f" - {player.name}: Score={player.score}, Ping={player.ping}") + + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + + asyncio.run(main_async()) + + diff --git a/opengsq/protocols/supcom.py b/opengsq/protocols/supcom.py new file mode 100644 index 0000000..ce1a614 --- /dev/null +++ b/opengsq/protocols/supcom.py @@ -0,0 +1,520 @@ +""" +Supreme Commander Protocol Implementation + +This module implements the network discovery protocol for Supreme Commander games. +The protocol works by sending a UDP broadcast to port 15000 and receiving responses +from game servers. + +Protocol Details: +- Request: UDP broadcast to port 15000 with payload 0x6e 0x03 0x00 +- Response: Key-value pairs with 0x01 prefix for strings, null-terminated +- Local port: 55582 for receiving responses + +Supported Games: +- Supreme Commander (SC1) +- Supreme Commander: Forged Alliance (SCFA) +""" + +from __future__ import annotations + +import struct +import asyncio +import socket +from typing import Dict, Any, Optional, List, Tuple + +from opengsq.binary_reader import BinaryReader +from opengsq.protocol_base import ProtocolBase +from opengsq.responses.supcom import Status + + +class SupCom(ProtocolBase): + """ + Supreme Commander Protocol + + Implements the network discovery protocol for Supreme Commander games. + Uses UDP broadcast on port 15000 with responses on port 55582. + """ + + full_name = "Supreme Commander Protocol" + + # Protocol constants + QUERY_PORT = 15000 # Port to send broadcast queries to + RESPONSE_PORT = 55582 # Local port to receive responses on + + # Request/Response markers + REQUEST_MARKER = 0x6E # 'n' - network query + RESPONSE_MARKER = 0x6F # 'o' - ok/output response + STRING_TYPE = 0x01 # String value marker + BLOCK_START = 0x04 # Start of options block + BLOCK_END = 0x05 # End of block + + # Request payload + QUERY_PAYLOAD = bytes([0x6E, 0x03, 0x00]) # 'n' + version 3 + terminator + + def __init__(self, host: str = "255.255.255.255", port: int = QUERY_PORT, timeout: float = 5.0): + """ + Initialize the Supreme Commander protocol. + + Args: + host: Target host or broadcast address (default: 255.255.255.255) + port: Query port (default: 15000) + timeout: Connection timeout in seconds (default: 5.0) + """ + super().__init__(host, port, timeout) + + async def get_status(self) -> Status: + """ + Query a Supreme Commander server for its status. + + For direct server queries (non-broadcast), sends a query to the specific + host and port, then parses the response. + + Returns: + Status: Parsed server status information + + Raises: + Exception: If no response is received or parsing fails + """ + response = await self._send_query() + return self._parse_response(response) + + async def discover_servers(self, broadcast_addr: str = "255.255.255.255") -> List[Tuple[str, Status]]: + """ + Discover Supreme Commander servers on the local network. + + Sends a UDP broadcast query and collects all responses. + + Args: + broadcast_addr: Broadcast address to use (default: 255.255.255.255) + + Returns: + List of tuples containing (server_ip, Status) + """ + servers = [] + + try: + loop = asyncio.get_running_loop() + responses = [] + + # Create protocol for collecting responses + class ResponseCollector(asyncio.DatagramProtocol): + def __init__(self): + self.transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data: bytes, addr: Tuple[str, int]): + responses.append((data, addr)) + + def error_received(self, exc): + pass + + # Create UDP socket on RESPONSE_PORT for both sending AND receiving + # Supreme Commander servers respond to the source port of the query, + # so we must send FROM port 55582, not just listen on it + # Use SO_REUSEADDR to allow rapid re-binding after previous scan + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.bind(('0.0.0.0', self.RESPONSE_PORT)) + sock.setblocking(False) + + transport, protocol = await loop.create_datagram_endpoint( + ResponseCollector, + sock=sock + ) + + try: + # Send broadcast query FROM port 55582 + transport.sendto(self.QUERY_PAYLOAD, (broadcast_addr, self.QUERY_PORT)) + + # Wait for responses + await asyncio.sleep(self._timeout) + + # Parse all responses + for data, addr in responses: + try: + status = self._parse_response(data) + servers.append((addr[0], status)) + except Exception: + continue + + finally: + transport.close() + + except Exception as e: + raise Exception(f"Discovery failed: {e}") + + return servers + + async def _send_query(self) -> bytes: + """ + Send a query to the configured host and receive the response. + + Returns: + bytes: Raw response data + """ + loop = asyncio.get_running_loop() + response_future = loop.create_future() + + class QueryProtocol(asyncio.DatagramProtocol): + def __init__(self): + self.transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data: bytes, addr: Tuple[str, int]): + if not response_future.done(): + response_future.set_result(data) + + def error_received(self, exc): + if not response_future.done(): + response_future.set_exception(exc) + + # For broadcast queries, we need to listen on the response port + is_broadcast = self._host in ('255.255.255.255', '') + local_port = self.RESPONSE_PORT if is_broadcast else 0 + + transport, protocol = await loop.create_datagram_endpoint( + QueryProtocol, + local_addr=('0.0.0.0', local_port), + allow_broadcast=True + ) + + try: + # Send query + transport.sendto(self.QUERY_PAYLOAD, (self._host, self._port)) + + # Wait for response + response = await asyncio.wait_for(response_future, timeout=self._timeout) + return response + + finally: + transport.close() + + def _parse_response(self, data: bytes) -> Status: + """ + Parse a Supreme Commander server response. + + The response format is: + - Header: 0x6F (response marker) + 2 bytes length (little endian) + header info + - Data: Key-value pairs with 0x01 prefix for strings + + Args: + data: Raw response bytes + + Returns: + Status: Parsed server status + + Raises: + Exception: If response is invalid or parsing fails + """ + if len(data) < 10: + raise Exception(f"Response too short: {len(data)} bytes") + + # Verify response marker + if data[0] != self.RESPONSE_MARKER: + raise Exception(f"Invalid response marker: 0x{data[0]:02x}, expected 0x{self.RESPONSE_MARKER:02x}") + + # Parse header + # Bytes 1-2: Length (little endian) + response_length = struct.unpack(' 0: + marker = br.read_byte() + + if marker == self.STRING_TYPE: + # Read key + key = br.read_string() + + if br.remaining_bytes() == 0: + break + + # Read value marker + value_marker = br.read_byte() + + if value_marker == self.STRING_TYPE: + # String value + value = br.read_string() + + if in_options_block: + options[key] = value + else: + parsed_data[key] = value + + elif value_marker == 0x00: + # Possibly a float value (4 bytes after 0x00) + if br.remaining_bytes() >= 4: + float_bytes = br.read_bytes(4) + try: + float_val = struct.unpack('= 1: + bool_byte = br.read_byte() + bool_val = bool_byte == 0x01 + if in_options_block: + options[key] = bool_val + else: + parsed_data[key] = bool_val + else: + # Unknown value marker, skip + pass + + elif marker == self.BLOCK_START: + # Start of options block + in_options_block = True + + elif marker == self.BLOCK_END: + # End of options block + in_options_block = False + + elif marker == 0x00: + # Null byte, skip + continue + + else: + # Unknown marker, skip + continue + + # Merge all data - Options block detection might not work perfectly, + # so we look in both dictionaries + all_data = {**options, **parsed_data} + + # Helper to get value from either dict + def get_val(key: str, default: Any = None) -> Any: + return all_data.get(key, default) + + # Parse cheats_enabled as boolean + cheats_str = str(get_val('CheatsEnabled', 'false')).lower() + cheats_enabled = cheats_str == 'true' + + # Parse team_lock + team_lock = get_val('TeamLock', 'unlocked') + + # Get scenario file and extract map info from lookup table + scenario_file = get_val('ScenarioFile', '') + max_players, lookup_map_name, map_size = self._extract_map_info(scenario_file) + + # Build Status object + return Status( + game_name=get_val('GameName', 'Unknown'), + hosted_by=get_val('HostedBy', 'Unknown'), + product_code=get_val('ProductCode', 'SC1'), + scenario_file=scenario_file, + num_players=int(get_val('PlayerCount', 0)), + max_players=max_players, + map_width=map_size[0] if map_size else 0, + map_height=map_size[1] if map_size else 0, + map_name_lookup=lookup_map_name or "", + game_speed=get_val('GameSpeed', 'normal'), + victory_condition=get_val('Victory', 'demoralization'), + fog_of_war=get_val('FogOfWar', 'explored'), + unit_cap=get_val('UnitCap', '500'), + cheats_enabled=cheats_enabled, + team_lock=team_lock, + team_spawn=get_val('TeamSpawn', 'random'), + allow_observers=get_val('AllowObservers', True), + no_rush_option=get_val('NoRushOption', 'Off'), + prebuilt_units=get_val('PrebuiltUnits', 'Off'), + civilian_alliance=get_val('CivilianAlliance', 'enemy'), + timeouts=get_val('Timeouts', '3'), + options=all_data, + raw={ + 'parsed_data': parsed_data, + 'options': options, + 'all_data': all_data, + 'response_length': response_length, + 'raw_hex': data.hex() + } + ) + + # Supreme Commander standard map data + # Auto-generated from scenario files in /maps folder + # Format: 'map_id': {'name': 'Map Name', 'players': max_players, 'size': (width, height)} + # Size units: 256=5km, 512=10km, 1024=20km, 2048=40km, 4096=80km + SCMP_MAP_DATA = { + # 2 Player Maps + 'scmp_012': {'name': 'Theta Passage', 'players': 2, 'size': (256, 256)}, + 'scmp_013': {'name': 'Winter Duel', 'players': 2, 'size': (256, 256)}, + 'scmp_016': {'name': 'Canis River', 'players': 2, 'size': (256, 256)}, + 'scmp_019': {'name': 'Finn', 'players': 2, 'size': (512, 512)}, + 'scmp_023': {'name': 'Varga Pass', 'players': 2, 'size': (512, 512)}, + # 3 Player Maps + 'scmp_018': {'name': 'Sentry Point', 'players': 3, 'size': (256, 256)}, + 'scmp_037': {'name': 'Sludge', 'players': 3, 'size': (256, 256)}, + # 4 Player Maps + 'scmp_003': {'name': 'Drake', 'players': 4, 'size': (1024, 1024)}, + 'scmp_004': {'name': 'Emerald Crater', 'players': 4, 'size': (1024, 1024)}, + 'scmp_006': {'name': 'Ian', 'players': 4, 'size': (1024, 1024)}, + 'scmp_015': {'name': 'Fields of Isis', 'players': 4, 'size': (512, 512)}, + 'scmp_017': {'name': 'Syrtis Major', 'players': 4, 'size': (512, 512)}, + 'scmp_022': {'name': 'Arctic Refuge', 'players': 4, 'size': (512, 512)}, + 'scmp_026': {'name': 'Vya-3 Protectorate', 'players': 4, 'size': (512, 512)}, + 'scmp_031': {'name': 'Four-Leaf Clover', 'players': 4, 'size': (512, 512)}, + 'scmp_032': {'name': 'The Wilderness', 'players': 4, 'size': (512, 512)}, + 'scmp_034': {'name': 'High Noon', 'players': 4, 'size': (512, 512)}, + 'scmp_035': {'name': 'Paradise', 'players': 4, 'size': (512, 512)}, + 'scmp_036': {'name': 'Blasted Rock', 'players': 4, 'size': (256, 256)}, + 'scmp_038': {'name': 'Ambush Pass', 'players': 4, 'size': (256, 256)}, + 'scmp_039': {'name': 'Four-Corners', 'players': 4, 'size': (256, 256)}, + # 5 Player Maps + 'scmp_010': {'name': 'Sung Island', 'players': 5, 'size': (1024, 1024)}, + # 6 Player Maps + 'scmp_007': {'name': 'Open Palms', 'players': 6, 'size': (512, 512)}, + 'scmp_020': {'name': 'Roanoke Abyss', 'players': 6, 'size': (1024, 1024)}, + 'scmp_024': {'name': 'Crossfire Canal', 'players': 6, 'size': (1024, 1024)}, + 'scmp_025': {'name': 'Saltrock Colony', 'players': 6, 'size': (512, 512)}, + 'scmp_027': {'name': 'The Scar', 'players': 6, 'size': (1024, 1024)}, + 'scmp_033': {'name': 'White Fire', 'players': 6, 'size': (512, 512)}, + 'scmp_040': {'name': 'The Ditch', 'players': 6, 'size': (1024, 1024)}, + # 7 Player Maps + 'scmp_005': {'name': "Gentleman's Reef", 'players': 7, 'size': (2048, 2048)}, + # 8 Player Maps + 'scmp_001': {'name': 'Burial Mounds', 'players': 8, 'size': (1024, 1024)}, + 'scmp_002': {'name': 'Concord Lake', 'players': 8, 'size': (1024, 1024)}, + 'scmp_008': {'name': 'Seraphim Glaciers', 'players': 8, 'size': (1024, 1024)}, + 'scmp_009': {'name': "Seton's Clutch", 'players': 8, 'size': (1024, 1024)}, + 'scmp_011': {'name': 'The Great Void', 'players': 8, 'size': (2048, 2048)}, + 'scmp_014': {'name': 'The Bermuda Locket', 'players': 8, 'size': (1024, 1024)}, + 'scmp_021': {'name': 'Alpha 7 Quarantine', 'players': 8, 'size': (2048, 2048)}, + 'scmp_028': {'name': 'Hanna Oasis', 'players': 8, 'size': (2048, 2048)}, + 'scmp_029': {'name': 'Betrayal Ocean', 'players': 8, 'size': (4096, 4096)}, + 'scmp_030': {'name': 'Frostmill Ruins', 'players': 8, 'size': (4096, 4096)}, + # Forged Alliance Maps (x1mp_*) - sizes estimated + 'x1mp_001': {'name': 'Loki', 'players': 2, 'size': (256, 256)}, + 'x1mp_002': {'name': 'Minerva', 'players': 2, 'size': (256, 256)}, + 'x1mp_003': {'name': 'Nowhere', 'players': 2, 'size': (256, 256)}, + 'x1mp_004': {'name': 'Sphinx', 'players': 2, 'size': (256, 256)}, + 'x1mp_005': {'name': 'Desert', 'players': 4, 'size': (512, 512)}, + 'x1mp_006': {'name': 'Eye of the Storm', 'players': 4, 'size': (512, 512)}, + 'x1mp_007': {'name': 'Forbidden Pass', 'players': 4, 'size': (512, 512)}, + 'x1mp_008': {'name': 'Shards', 'players': 4, 'size': (512, 512)}, + 'x1mp_009': {'name': 'Cauldron', 'players': 6, 'size': (1024, 1024)}, + 'x1mp_010': {'name': 'Emerald City', 'players': 6, 'size': (1024, 1024)}, + 'x1mp_011': {'name': "Finn's Revenge", 'players': 6, 'size': (1024, 1024)}, + 'x1mp_012': {'name': 'Flooded Strip Mine', 'players': 6, 'size': (1024, 1024)}, + 'x1mp_014': {'name': 'Strip Mine', 'players': 8, 'size': (2048, 2048)}, + 'x1mp_017': {'name': 'Setons Clutch II', 'players': 8, 'size': (1024, 1024)}, + } + + def _extract_map_info(self, scenario_file: str) -> tuple: + """ + Extract map information from scenario filename using lookup table. + + Supreme Commander does not send max_players, map name, or size in the + protocol response, so we use a lookup table for known standard maps. + + Args: + scenario_file: Path to scenario file (e.g., /maps/scmp_039/scmp_039_scenario.lua) + + Returns: + tuple: (max_players, map_name, size) - 0, None, None for unknown maps + size is a tuple (width, height) in game units + """ + if not scenario_file: + return 0, None, None + + # Extract map ID from path (e.g., "scmp_039" from "/maps/scmp_039/scmp_039_scenario.lua") + import re + match = re.search(r'(scmp_\d+|x1mp_\d+)', scenario_file.lower()) + if match: + map_id = match.group(1) + map_data = self.SCMP_MAP_DATA.get(map_id) + if map_data: + return map_data['players'], map_data['name'], map_data.get('size') + + return 0, None, None + + def _extract_max_players(self, scenario_file: str) -> int: + """ + Extract maximum players from scenario filename. + + Args: + scenario_file: Path to scenario file + + Returns: + int: Max players for known maps, 0 for unknown maps + """ + max_players, _ = self._extract_map_info(scenario_file) + return max_players + + +if __name__ == "__main__": + import asyncio + + async def main_async(): + print("Supreme Commander Server Discovery") + print("=" * 50) + + # Test direct query to a specific server + supcom = SupCom(host="172.29.100.29", port=15000, timeout=5.0) + + try: + print(f"\nQuerying server at {supcom._host}:{supcom._port}...") + status = await supcom.get_status() + + print(f"\nServer Status:") + print(f" Game Name: {status.game_name}") + print(f" Hosted By: {status.hosted_by}") + print(f" Product: {status.game_title}") + print(f" Map: {status.map_name}") + print(f" Players: {status.num_players}/{status.max_players}") + print(f" Game Speed: {status.game_speed}") + print(f" Victory: {status.victory_condition}") + print(f" Unit Cap: {status.unit_cap}") + print(f" Cheats: {status.cheats_enabled}") + print(f"\nFull Options:") + for key, value in status.options.items(): + print(f" {key}: {value}") + + except Exception as e: + print(f"Error: {e}") + + # Test broadcast discovery + print("\n" + "=" * 50) + print("Broadcasting for servers on local network...") + + try: + supcom_broadcast = SupCom(timeout=3.0) + servers = await supcom_broadcast.discover_servers() + + print(f"\nDiscovered {len(servers)} server(s):") + for ip, server_status in servers: + print(f"\n Server at {ip}:") + print(f" Game: {server_status.game_name}") + print(f" Host: {server_status.hosted_by}") + print(f" Players: {server_status.num_players}/{server_status.max_players}") + + except Exception as e: + print(f"Broadcast discovery error: {e}") + + asyncio.run(main_async()) + diff --git a/opengsq/responses/jediknight/__init__.py b/opengsq/responses/jediknight/__init__.py new file mode 100644 index 0000000..5ce59d5 --- /dev/null +++ b/opengsq/responses/jediknight/__init__.py @@ -0,0 +1,5 @@ +from .info import Info +from .status import Status +from .jediknight_status import JediKnightStatus + + diff --git a/opengsq/responses/jediknight/info.py b/opengsq/responses/jediknight/info.py new file mode 100644 index 0000000..82ee5df --- /dev/null +++ b/opengsq/responses/jediknight/info.py @@ -0,0 +1,92 @@ +from dataclasses import dataclass + + +def translate_gametype(gametype_code: str) -> str: + """ + Translate Jedi Academy gametype codes to display names. + + :param gametype_code: The gametype code from the server + :return: Display name for the gametype + """ + gametype_translations = { + '0': 'Free For All', + '3': 'Duel', + '4': 'Power Duel', + '6': 'Team FFA', + '7': 'Siege', + '8': 'Capture the Flag', + } + + return gametype_translations.get(str(gametype_code), gametype_code) + + +@dataclass +class Info: + """ + Represents the info response from a Star Wars Jedi Knight - Jedi Academy server. + """ + + fdisable: str = "" + """Force powers disable flags.""" + + wdisable: str = "" + """Weapons disable flags.""" + + truejedi: str = "" + """True Jedi mode enabled.""" + + needpass: str = "" + """Password required.""" + + gametype: str = "" + """Game type.""" + + sv_maxclients: str = "" + """Maximum clients.""" + + clients: str = "" + """Current clients.""" + + mapname: str = "" + """Current map name.""" + + hostname: str = "" + """Server hostname.""" + + protocol: str = "" + """Protocol version.""" + + challenge: str = "" + """Challenge string.""" + + def __init__(self, data: dict[str, str]): + """ + Initialize Info object from parsed data dictionary. + + :param data: Dictionary containing server information + """ + for key, value in data.items(): + if hasattr(self, key): + setattr(self, key, value) + + @property + def gametype_translated(self) -> str: + """ + Get the translated gametype name. + + :return: Display name for the gametype + """ + return translate_gametype(self.gametype) + + def __getattribute__(self, name): + if name == '__dict__': + # Create a custom dict that includes properties + result = {} + # Get the original __dict__ first + original_dict = object.__getattribute__(self, '__dict__') + result.update(original_dict) + # Add the translated gametype + result['gametype_translated'] = self.gametype_translated + return result + return object.__getattribute__(self, name) + diff --git a/opengsq/responses/jediknight/jediknight_status.py b/opengsq/responses/jediknight/jediknight_status.py new file mode 100644 index 0000000..3fce923 --- /dev/null +++ b/opengsq/responses/jediknight/jediknight_status.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass +from .info import Info +from .status import Status + + +@dataclass +class JediKnightStatus: + """ + Represents the combined status information from a Star Wars Jedi Knight - Jedi Academy server. + Contains both info and status responses. + """ + + info: Info + """The server info response.""" + + status: Status + """The server status response.""" + + diff --git a/opengsq/responses/jediknight/status.py b/opengsq/responses/jediknight/status.py new file mode 100644 index 0000000..6f4f78b --- /dev/null +++ b/opengsq/responses/jediknight/status.py @@ -0,0 +1,216 @@ +from dataclasses import dataclass, field +from typing import List + + +def translate_gametype(gametype_code: str) -> str: + """ + Translate Jedi Academy gametype codes to display names. + + :param gametype_code: The gametype code from the server + :return: Display name for the gametype + """ + gametype_translations = { + '0': 'Free For All', + '3': 'Duel', + '4': 'Power Duel', + '6': 'Team FFA', + '7': 'Siege', + '8': 'Capture the Flag', + } + + return gametype_translations.get(str(gametype_code), gametype_code) + + +@dataclass +class Player: + """ + Represents a player on a Jedi Academy server. + """ + score: int = 0 + """Player score.""" + + ping: int = 0 + """Player ping.""" + + name: str = "" + """Player name.""" + + +@dataclass +class Status: + """ + Represents the status response from a Star Wars Jedi Knight - Jedi Academy server. + """ + + challenge: str = "" + """Challenge string.""" + + capturelimit: str = "" + """Capture limit for CTF.""" + + sv_hostname: str = "" + """Server hostname.""" + + sv_maxRate: str = "" + """Maximum rate.""" + + sv_minPing: str = "" + """Minimum ping.""" + + sv_maxPing: str = "" + """Maximum ping.""" + + sv_floodProtect: str = "" + """Flood protection.""" + + g_siegeTeamSwitch: str = "" + """Siege team switch.""" + + version: str = "" + """Server version.""" + + dmflags: str = "" + """Deathmatch flags.""" + + fraglimit: str = "" + """Frag limit.""" + + timelimit: str = "" + """Time limit.""" + + g_maxHolocronCarry: str = "" + """Maximum holocrons to carry.""" + + g_privateDuel: str = "" + """Private duel enabled.""" + + g_saberLocking: str = "" + """Saber locking enabled.""" + + g_maxForceRank: str = "" + """Maximum force rank.""" + + duel_fraglimit: str = "" + """Duel frag limit.""" + + g_forceBasedTeams: str = "" + """Force based teams.""" + + g_duelWeaponDisable: str = "" + """Duel weapon disable.""" + + g_gametype: str = "" + """Game type.""" + + g_needpass: str = "" + """Password required.""" + + protocol: str = "" + """Protocol version.""" + + mapname: str = "" + """Current map name.""" + + sv_privateClients: str = "" + """Private clients.""" + + sv_maxclients: str = "" + """Maximum clients.""" + + sv_allowDownload: str = "" + """Allow downloads.""" + + bot_minplayers: str = "" + """Minimum bot players.""" + + g_debugMelee: str = "" + """Debug melee.""" + + g_stepSlideFix: str = "" + """Step slide fix.""" + + g_noSpecMove: str = "" + """No spectator movement.""" + + gamename: str = "" + """Game name (basejka for base game).""" + + g_maxGameClients: str = "" + """Maximum game clients.""" + + g_jediVmerc: str = "" + """Jedi vs Merc mode.""" + + g_allowNPC: str = "" + """Allow NPCs.""" + + g_forceRegenTime: str = "" + """Force regeneration time.""" + + g_forcePowerDisable: str = "" + """Force power disable flags.""" + + g_weaponDisable: str = "" + """Weapon disable flags.""" + + g_siegeRespawn: str = "" + """Siege respawn time.""" + + g_saberWallDamageScale: str = "" + """Saber wall damage scale.""" + + bg_fighterAltControl: str = "" + """Fighter alt control.""" + + g_siegeTeam1: str = "" + """Siege team 1.""" + + g_siegeTeam2: str = "" + """Siege team 2.""" + + g_showDuelHealths: str = "" + """Show duel healths.""" + + players: List[Player] = field(default_factory=list) + """List of players on the server.""" + + def __init__(self, data: dict[str, str], players: List[Player] = None): + """ + Initialize Status object from parsed data dictionary. + + :param data: Dictionary containing server status information + :param players: List of players on the server + """ + for key, value in data.items(): + # Handle potential typos in server response + if key == 'g_saberWeallDamageScale': + setattr(self, 'g_saberWallDamageScale', value) + elif key == 'g_debugeMelee': + setattr(self, 'g_debugMelee', value) + elif hasattr(self, key): + setattr(self, key, value) + + self.players = players if players is not None else [] + + @property + def g_gametype_translated(self) -> str: + """ + Get the translated gametype name. + + :return: Display name for the gametype + """ + return translate_gametype(self.g_gametype) + + def __getattribute__(self, name): + if name == '__dict__': + # Create a custom dict that includes properties + result = {} + # Get the original __dict__ first + original_dict = object.__getattribute__(self, '__dict__') + result.update(original_dict) + # Add the translated gametype + result['g_gametype_translated'] = self.g_gametype_translated + return result + return object.__getattribute__(self, name) + + diff --git a/opengsq/responses/supcom/__init__.py b/opengsq/responses/supcom/__init__.py new file mode 100644 index 0000000..a0b0742 --- /dev/null +++ b/opengsq/responses/supcom/__init__.py @@ -0,0 +1,4 @@ +from opengsq.responses.supcom.status import Status + +__all__ = ["Status"] + diff --git a/opengsq/responses/supcom/status.py b/opengsq/responses/supcom/status.py new file mode 100644 index 0000000..125dd2c --- /dev/null +++ b/opengsq/responses/supcom/status.py @@ -0,0 +1,179 @@ +from dataclasses import dataclass, field +from typing import Dict, Any, Optional + + +@dataclass +class Status: + """ + Represents the status of a Supreme Commander game server. + + Attributes: + game_name: Name of the game lobby + hosted_by: Name of the host player + product_code: Product code (SC1 = Supreme Commander, SCFA = Forged Alliance) + scenario_file: Path to the map/scenario file + num_players: Current number of players + max_players: Maximum number of players (from map lookup table) + map_width: Map width in game units (from lookup table, 0 if unknown) + map_height: Map height in game units (from lookup table, 0 if unknown) + map_name_lookup: Map name from lookup table (empty if unknown) + game_speed: Game speed setting (slow/normal/fast) + victory_condition: Victory condition type + fog_of_war: Fog of war setting + unit_cap: Unit cap setting + cheats_enabled: Whether cheats are enabled + team_lock: Team lock setting + team_spawn: Team spawn setting + allow_observers: Whether observers are allowed + no_rush_option: No rush timer setting + prebuilt_units: Prebuilt units setting + civilian_alliance: Civilian alliance setting + timeouts: Number of allowed timeouts + options: Full options dictionary + raw: Raw response data for debugging + """ + game_name: str + hosted_by: str + product_code: str + scenario_file: str + num_players: int + max_players: int = 8 + map_width: int = 0 + map_height: int = 0 + map_name_lookup: str = "" + game_speed: str = "normal" + victory_condition: str = "demoralization" + fog_of_war: str = "explored" + unit_cap: str = "500" + cheats_enabled: bool = False + team_lock: str = "unlocked" + team_spawn: str = "random" + allow_observers: bool = True + no_rush_option: str = "Off" + prebuilt_units: str = "Off" + civilian_alliance: str = "enemy" + timeouts: str = "3" + options: Dict[str, Any] = field(default_factory=dict) + raw: Dict[str, Any] = field(default_factory=dict) + + @property + def map_name(self) -> str: + """ + Get the map name. + + Prefers the lookup table name (map_name_lookup) if available, + otherwise falls back to extracting from scenario file path. + """ + # Try to get name from lookup table + if self.map_name_lookup: + return self.map_name_lookup + + # Fallback: extract from scenario file path + if not self.scenario_file: + return "Unknown Map" + + # Extract filename from path like "/maps/scmp_039/scmp_039_scenario.lua" + parts = self.scenario_file.replace('\\', '/').split('/') + if len(parts) >= 2: + # Return the map folder name + return parts[-2] if parts[-2] else parts[-1].replace('_scenario.lua', '') + return self.scenario_file + + @property + def map_id(self) -> str: + """Extract map ID from scenario file path (e.g., 'scmp_039')""" + if not self.scenario_file: + return "" + + import re + match = re.search(r'(scmp_\d+|x1mp_\d+)', self.scenario_file.lower()) + return match.group(1) if match else "" + + @property + def players_display(self) -> str: + """Get formatted player count string (handles unknown max_players)""" + if self.max_players == 0: + return f"{self.num_players}/?" + return f"{self.num_players}/{self.max_players}" + + @property + def max_players_known(self) -> bool: + """Check if max_players is known (from lookup table)""" + return self.max_players > 0 + + @property + def map_size(self) -> Optional[tuple]: + """ + Get the map size as (width, height) tuple in game units. + + Returns None if map is not in lookup table. + Size units: 256=5km, 512=10km, 1024=20km, 2048=40km, 4096=80km + """ + if self.map_width > 0 and self.map_height > 0: + return (self.map_width, self.map_height) + return None + + @property + def map_size_km(self) -> Optional[tuple]: + """ + Get the map size in kilometers as (width_km, height_km) tuple. + + Returns None if map is not in lookup table. + """ + size = self.map_size + if size: + # 51.2 game units = 1 km + return (size[0] / 51.2, size[1] / 51.2) + return None + + @property + def map_size_display(self) -> str: + """ + Get a human-readable map size string. + + Returns format like "20x20 km" or "?" for unknown maps. + """ + size_km = self.map_size_km + if size_km: + return f"{int(size_km[0])}x{int(size_km[1])} km" + return "?" + + @property + def map_size_category(self) -> str: + """ + Get the map size category name. + + Categories: + - 5x5 km: Tiny + - 10x10 km: Small + - 20x20 km: Medium + - 40x40 km: Large + - 80x80 km: Huge + """ + size = self.map_size + if not size: + return "Unknown" + + width = size[0] + if width <= 256: + return "Tiny (5x5 km)" + elif width <= 512: + return "Small (10x10 km)" + elif width <= 1024: + return "Medium (20x20 km)" + elif width <= 2048: + return "Large (40x40 km)" + else: + return "Huge (80x80 km)" + + @property + def game_title(self) -> str: + """Get human-readable game title from product code""" + titles = { + 'SC1': 'Supreme Commander', + 'SCFA': 'Supreme Commander: Forged Alliance', + 'SC2': 'Supreme Commander 2', + 'FAF': 'Forged Alliance Forever' + } + return titles.get(self.product_code, f'Supreme Commander ({self.product_code})') + diff --git a/tests/protocols/test_jediknight.py b/tests/protocols/test_jediknight.py new file mode 100644 index 0000000..1e6dba4 --- /dev/null +++ b/tests/protocols/test_jediknight.py @@ -0,0 +1,50 @@ +import pytest +from opengsq.protocols.jediknight import JediKnight + + +from ..result_handler import ResultHandler + +handler = ResultHandler(__file__) +handler.enable_save = True + +class TestJediKnight: + @pytest.mark.asyncio + async def test_get_info(self): + jk = JediKnight(host="172.29.100.29", port=29070, timeout=5.0) + info = await jk.get_info() + assert info is not None + # Check that we got some basic info + assert hasattr(info, 'hostname') + assert hasattr(info, 'mapname') + assert hasattr(info, 'gametype') + await handler.save_result("test_get_info", info) + + @pytest.mark.asyncio + async def test_get_status(self): + jk = JediKnight(host="172.29.100.29", port=29070, timeout=5.0) + status = await jk.get_status() + assert status is not None + # Check that we got some basic status info + assert hasattr(status, 'sv_hostname') + assert hasattr(status, 'mapname') + assert hasattr(status, 'gamename') + assert hasattr(status, 'players') + await handler.save_result("test_get_status", status) + + @pytest.mark.asyncio + async def test_get_full_status(self): + jk = JediKnight(host="172.29.100.29", port=29070, timeout=5.0) + full_status = await jk.get_full_status() + assert full_status is not None + assert full_status.info is not None + assert full_status.status is not None + await handler.save_result("test_get_full_status", full_status) + + @pytest.mark.asyncio + async def test_protocol_properties(self): + jk = JediKnight(host="172.29.100.29", port=29070) + assert jk.full_name == "Star Wars Jedi Knight - Jedi Academy Protocol" + assert jk._source_port == 29070 + await handler.save_result("test_protocol_properties", jk) + + diff --git a/tests/protocols/test_supcom.py b/tests/protocols/test_supcom.py new file mode 100644 index 0000000..b7e1989 --- /dev/null +++ b/tests/protocols/test_supcom.py @@ -0,0 +1,19 @@ +import pytest +from opengsq.protocols.supcom import SupCom + +from ..result_handler import ResultHandler + +handler = ResultHandler(__file__) +handler.enable_save = True + +# Supreme Commander +test = SupCom( + host="172.29.100.29" +) + + +@pytest.mark.asyncio +async def test_get_status(): + result = await test.get_status() + await handler.save_result("test_get_status", result) +