diff --git a/Makefile b/Makefile index ed562898..cad51e00 100644 --- a/Makefile +++ b/Makefile @@ -358,6 +358,12 @@ test: validator) SERVICES="$(SERVICES_validator)";; \ *) echo "Unknown component: $$comp"; exit 1;; \ esac; \ + if [ -d "subvortex/$$comp/core" ]; then \ + echo "🔍 Testing subvortex/$$comp/core..."; \ + PYTHONPATH=. pytest subvortex/$$comp/core || test $$? -eq 5 || exit $$?; \ + else \ + echo "⚠️ Warning: subvortex/$$comp/core not found, skipping..."; \ + fi; \ for svc in $$SERVICES; do \ svc_path=subvortex/$$comp/$$svc; \ if [ -d "$$svc_path" ]; then \ diff --git a/subvortex/core/core_bittensor/wallet.py b/subvortex/core/core_bittensor/wallet.py new file mode 100644 index 00000000..9bf17670 --- /dev/null +++ b/subvortex/core/core_bittensor/wallet.py @@ -0,0 +1,16 @@ +import bittensor_wallet as btw +import bittensor_wallet.mock as btwm + +def get_mock_wallet(coldkey: "btw.Keypair" = None, hotkey: "btw.Keypair" = None): + wallet = btwm.MockWallet(name="mock_wallet", hotkey="mock", path="/tmp/mock_wallet") + + if not coldkey: + coldkey = btw.Keypair.create_from_mnemonic(btw.Keypair.generate_mnemonic()) + if not hotkey: + hotkey = btw.Keypair.create_from_mnemonic(btw.Keypair.generate_mnemonic()) + + wallet.set_coldkey(coldkey, encrypt=False, overwrite=True) + wallet.set_coldkeypub(coldkey, encrypt=False, overwrite=True) + wallet.set_hotkey(hotkey, encrypt=False, overwrite=True) + + return wallet \ No newline at end of file diff --git a/subvortex/core/shared/mock.py b/subvortex/core/shared/mock.py index 63b8d96c..8f82118e 100644 --- a/subvortex/core/shared/mock.py +++ b/subvortex/core/shared/mock.py @@ -45,8 +45,7 @@ class MockSubtensor(btum.MockSubtensor): def __init__(self, netuid, n=16, wallet=None, network="mock"): super().__init__(network=network) - if not self.subnet_exists(netuid): - self.create_subnet(netuid) + self.create_subnet(netuid) # Register ourself (the validator) as a neuron at uid=0 if wallet is not None: diff --git a/subvortex/validator/challenger/Dockerfile b/subvortex/validator/challenger/Dockerfile new file mode 100644 index 00000000..bf8890dc --- /dev/null +++ b/subvortex/validator/challenger/Dockerfile @@ -0,0 +1,69 @@ +# syntax=docker/dockerfile:1.4 + +######################################## +# Stage 0 — Build wheels using wheel-builder +######################################## +FROM wheelbuilder AS wheels + +WORKDIR /build + +COPY ./subvortex/validator/challenger/requirements.txt ./requirements.txt + +RUN pip wheel -r requirements.txt -w /wheels \ + && rm -rf ~/.cache/pip + +######################################## +# Stage 1 — Final runtime image +######################################## +FROM python:3.11-slim + +# ---------- Metadata & Labels ---------- +ARG MAINTAINER="subvortex.bt@gmail.com" +ARG VERSION="0.0.0" +ARG ROLE_VERSION="0.0.0" +ARG COMPONENT_VERSION="0.0.0" +LABEL maintainer=$MAINTAINER +LABEL version=$VERSION +LABEL validator.version=$ROLE_VERSION +LABEL validator.challenger.version=$COMPONENT_VERSION + +# ---------- Environment ---------- +ENV PYTHONUNBUFFERED=1 +ARG SUBVORTEX_OBSERVER_TYPE=file +ENV SV_OBSERVER_TYPE=${SUBVORTEX_OBSERVER_TYPE} + +# ✅ Install required runtime libraries +RUN apt-get update && apt-get install -y \ + libnfnetlink0 \ + libnetfilter-queue1 \ + iptables \ + libcap2-bin \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# 🧱 Copy only built wheels from Stage 0 +COPY --from=wheels /wheels /tmp/wheels + +# ✅ Only copy wheels and install, discard after +COPY --from=wheels /wheels /tmp/wheels +COPY ./subvortex/validator/challenger/requirements.txt ./requirements.txt +RUN pip install --no-cache-dir --find-links=/tmp/wheels -r requirements.txt \ + && rm -rf /tmp/wheels ~/.cache/pip + +# 📁 Copy source code +COPY ./pyproject-validator.toml ./pyproject.toml +COPY ./subvortex/pyproject.toml ./subvortex/pyproject.toml +COPY ./subvortex/core ./subvortex/core +COPY ./subvortex/validator/version.py ./subvortex/validator/version.py +COPY ./subvortex/validator/core ./subvortex/validator/core +COPY ./subvortex/validator/challenger/src ./subvortex/validator/challenger/src +COPY ./subvortex/validator/challenger/entrypoint.sh ./subvortex/validator/challenger/entrypoint.sh +COPY ./subvortex/validator/challenger/pyproject.toml ./subvortex/validator/challenger/pyproject.toml + +# 🧩 Install project (editable) +RUN pip install "./subvortex/validator/challenger[observer-${SV_OBSERVER_TYPE}]" \ + && pip install -e . \ + && rm -rf ~/.cache/pip + +ENTRYPOINT ["/bin/bash", "./subvortex/validator/challenger/entrypoint.sh"] diff --git a/subvortex/validator/challenger/Makefile b/subvortex/validator/challenger/Makefile new file mode 100644 index 00000000..3203ba50 --- /dev/null +++ b/subvortex/validator/challenger/Makefile @@ -0,0 +1,22 @@ +.PHONY: bump-patch bump-minor bump-major bump-alpha bump-rc clean + +VERSION_SCRIPT = ../../../scripts/cicd/cicd_bump_version.py +DIST_DIR = ../../dist + +bump-patch: + @python3 $(VERSION_SCRIPT) patch + +bump-minor: + @python3 $(VERSION_SCRIPT) minor + +bump-ajor: + @python3 $(VERSION_SCRIPT) major + +bump-alpha: + @python3 $(VERSION_SCRIPT) alpha + +bump-rc: + @python3 $(VERSION_SCRIPT) rc + +clean: + rm -rf $(DIST_DIR) *.egg-info **/*.egg-info build/ \ No newline at end of file diff --git a/subvortex/validator/challenger/README.md b/subvortex/validator/challenger/README.md new file mode 100644 index 00000000..fe4d5d35 --- /dev/null +++ b/subvortex/validator/challenger/README.md @@ -0,0 +1 @@ +Coming soon \ No newline at end of file diff --git a/subvortex/validator/challenger/entrypoint.sh b/subvortex/validator/challenger/entrypoint.sh new file mode 100755 index 00000000..4c8a6d2e --- /dev/null +++ b/subvortex/validator/challenger/entrypoint.sh @@ -0,0 +1,37 @@ +#!/bin/bash -eu + +# Load environment variables +export $(grep -v '^#' .env | xargs) + +# Build CLI args from SUBVORTEX_ environment variables +ARGS=() +PREFIX="SUBVORTEX_" + +while IFS= read -r line; do + key="${line%%=*}" + value="${line#*=}" + + # Skip if key doesn't start with PREFIX or value is empty (even if it's just "") + if [[ $key != ${PREFIX}* || -z "${value//\"/}" ]]; then + continue + fi + + key_suffix="${key#$PREFIX}" + cli_key="--$(echo "$key_suffix" | tr '[:upper:]' '[:lower:]' | tr '_' '.')" + value_lower="$(echo "$value" | tr '[:upper:]' '[:lower:]')" + + if [[ "$value_lower" == "true" ]]; then + ARGS+=("$cli_key") + elif [[ "$value_lower" == "false" ]]; then + continue + else + ARGS+=("$cli_key" "$value") + fi +done < <(env) + +if [ $# -eq 0 ]; then + python ./subvortex/validator/challenger/src/main.py \ + "${ARGS[@]}" +else + exec "$@" +fi \ No newline at end of file diff --git a/subvortex/validator/challenger/env.template b/subvortex/validator/challenger/env.template new file mode 100644 index 00000000..b3e70261 --- /dev/null +++ b/subvortex/validator/challenger/env.template @@ -0,0 +1,10 @@ +SUBVORTEX_NETUID=7 +SUBVORTEX_AXON_IP=127.0.0.1 +SUBVORTEX_AXON_PORT=8091 +SUBVORTEX_AXON_EXTERNAL_PORT=8091 +SUBVORTEX_SUBTENSOR_NETWORK=finney +SUBVORTEX_DATABASE_HOST=localhost +SUBVORTEX_DATABASE_PASSWORD= +SUBVORTEX_DATABASE_PORT=6379 +SUBVORTEX_DATABASE_INDEX=0 +SUBVORTEX_LOGGING_DEBUG=True \ No newline at end of file diff --git a/subvortex/validator/challenger/manifest.json b/subvortex/validator/challenger/manifest.json new file mode 100644 index 00000000..3b352198 --- /dev/null +++ b/subvortex/validator/challenger/manifest.json @@ -0,0 +1,27 @@ +{ + "id": "validator-challenger", + "name": "subvortex-validator-challenger", + "description": "SubVortex Validator challenger", + "version": "3.1.0", + "validator.version": "3.1.0", + "validator.challenger.version": "3.1.0", + "type": "python", + "challenger": "validator", + "component": "challenger", + "common": { + "user": "root", + "group": "root" + }, + "service": { + "environment": { + "PYTHONUNBUFFERED": 1 + }, + "restart": "on-failure", + "restart_sec": "10s", + "log_prefix": "subvortex-validator-challenger" + }, + "depends_on": [ + "validator-redis", + "validator-metagraph" + ] +} diff --git a/subvortex/validator/challenger/pyproject.toml b/subvortex/validator/challenger/pyproject.toml new file mode 100644 index 00000000..3cfd972d --- /dev/null +++ b/subvortex/validator/challenger/pyproject.toml @@ -0,0 +1,46 @@ +[build-system] +requires = ["setuptools>=70"] +build-backend = "setuptools.build_meta" + +[project] +name = "subvortex-validator-challenger" +version = "3.1.0" +description = "SubVortex Validator Challenger" +authors = [{ name = "Eclipse Vortex", email = "subvortex.bt@gmail.com" }] +readme = "README.md" +requires-python = ">=3.10" +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Mathematics", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", +] + +[project.license] +text = "MIT" + +[tool.setuptools] +include-package-data = true + +[tool.setuptools.packages.find] +where = ["."] + +[tool.setuptools.package-data] +"*" = [ + "deployment/**/*", + "requirements.txt", + "README.md", + "pyproject.toml", + "env.template", + "docker-compose.yml", + "metadata.json", + "manifest.json" +] diff --git a/subvortex/validator/challenger/requirements-dev.txt b/subvortex/validator/challenger/requirements-dev.txt new file mode 100644 index 00000000..320fa266 --- /dev/null +++ b/subvortex/validator/challenger/requirements-dev.txt @@ -0,0 +1,4 @@ +-r requirements.txt + +pytest==8.3.5 +pytest_asyncio==0.26.0 \ No newline at end of file diff --git a/subvortex/validator/challenger/requirements.txt b/subvortex/validator/challenger/requirements.txt new file mode 100644 index 00000000..2f68f848 --- /dev/null +++ b/subvortex/validator/challenger/requirements.txt @@ -0,0 +1,2 @@ +bittensor==9.6.0 +bittensor-wallet==3.0.10 \ No newline at end of file diff --git a/subvortex/validator/core/.gitkeep b/subvortex/validator/challenger/src/challenger.py similarity index 100% rename from subvortex/validator/core/.gitkeep rename to subvortex/validator/challenger/src/challenger.py diff --git a/subvortex/validator/challenger/src/main.py b/subvortex/validator/challenger/src/main.py new file mode 100644 index 00000000..6ed619a1 --- /dev/null +++ b/subvortex/validator/challenger/src/main.py @@ -0,0 +1,110 @@ +import asyncio +import argparse +import traceback +from dotenv import load_dotenv + +import bittensor.utils.btlogging as btul +import bittensor.core.config as btcc +import bittensor.core.async_subtensor as btcas +import bittensor.core.metagraph as btcm + +import subvortex.core.core_bittensor.config.config_utils as scccu +import subvortex.core.challenger.challenger as sccc +import subvortex.core.challenger.database as sccd + +import subvortex.validator.metagraph.src.settings as svme + + +# Load the environment variables for the whole process +load_dotenv(override=True) + + +async def wait_for_database_connection( + settings: svme.Settings, database: sccd.NeuronDatabase +) -> None: + btul.logging.warning( + "⏳ Waiting for Redis to become available...", + prefix=settings.logging_name, + ) + + # Ensure the connection + await database.ensure_connection() + + while True: + if await database.is_connection_alive(): + btul.logging.info("✅ Connected to Redis.", prefix=settings.logging_name) + return + + await asyncio.sleep(1) + + +async def main(): + parser = argparse.ArgumentParser() + btul.logging.add_args(parser) + btcas.AsyncSubtensor.add_args(parser) + + # Create the configuration + config = btcc.Config(parser) + + # Create settings + settings = svme.Settings.create() + scccu.update_config(settings, config, parser) + + # Initialise logging + btul.logging(config=config, debug=True) + btul.logging.set_trace(config.logging.trace) + btul.logging._stream_formatter.set_trace(config.logging.trace) + btul.logging.debug(str(config)) + + # Display the settings + btul.logging.info(f"metagraph settings: {settings}") + + database = None + challenger = None + subtensor = None + try: + # Create the storage + database = sccd.ChallengerDatabase(settings=settings) + await wait_for_database_connection(settings=settings, database=database) + + # Initialize the subtensor + subtensor = btcas.AsyncSubtensor(config=config) + await subtensor.initialize() + btul.logging.info(str(subtensor)) + + # Initialize the metagraph + # TODO: Tell OTF if i provide the subtensor the network wil be finney even if the subtensor is in test! + metagraph = btcm.AsyncMetagraph( + netuid=settings.netuid, network=subtensor.network, sync=False + ) + btul.logging.info(str(metagraph)) + + # Create and run the metagraph observer + challenger = sccc.Challenger( + settings=settings, + subtensor=subtensor, + metagraph=metagraph, + database=database, + ) + await challenger.start() + + except Exception as err: + btul.logging.error("Error in training loop", str(err)) + btul.logging.debug(traceback.print_exception(type(err), err, err.__traceback__)) + + except KeyboardInterrupt: + btul.logging.info("Keyboard interrupt detected, exiting.") + + finally: + if database: + await database.mark_as_unready() + + if challenger: + await challenger.stop() + + if subtensor: + await subtensor.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/subvortex/validator/core/Makefile b/subvortex/validator/core/Makefile new file mode 100644 index 00000000..e038a357 --- /dev/null +++ b/subvortex/validator/core/Makefile @@ -0,0 +1,25 @@ +# ========= +# 🧪 Tests +# ========= +TARGETS += test + +test: + PYTHONPATH=../.. pytest . $(ARGS) + +# ===================== +# Add the last target +# ===================== +.PHONY: $(TARGETS) help + +# ===================== +# Optional: help target +# ===================== +help: + @echo "📦 CI/CD Targets:" + @echo "" + @echo " test – Run pytest in all service folders" + +targets: + @echo "📋 Available Dynamic Targets:" + @echo "" + @printf " %s\n" $(sort $(TARGETS)) \ No newline at end of file diff --git a/subvortex/validator/core/challenger/challenger.py b/subvortex/validator/core/challenger/challenger.py new file mode 100644 index 00000000..ee90b019 --- /dev/null +++ b/subvortex/validator/core/challenger/challenger.py @@ -0,0 +1,525 @@ +import time +import typing +import asyncio +from collections import Counter + +import bittensor.utils.btlogging as btul +import bittensor.core.metagraph as btcm +import bittensor.core.async_subtensor as btcas + +import subvortex.core.model.neuron as scmn +import subvortex.core.core_bittensor.subtensor as scbs +import subvortex.validator.core.challenger.model as ccm +import subvortex.validator.core.challenger.database as sccd +import subvortex.validator.core.challenger.scheduler as sccc +import subvortex.validator.core.challenger.settings as sccs +import subvortex.validator.core.challenger.challenges.executor as svccce +import subvortex.validator.core.model.miner as cmm + + +class Challenger: + def __init__( + self, + settings: sccs.Settings, + database: sccd.ChallengerDatabase, + subtensor: btcas.AsyncSubtensor, + metagraph: btcm.AsyncMetagraph, + ): + self.settings = settings + self.database = database + self.subtensor = subtensor + self.metagraph = metagraph + + self.should_exit = asyncio.Event() + self.finished = asyncio.Event() + + async def start(self): + btul.logging.info( + "🚀 Challenger service starting...", + prefix=self.settings.logging_name, + ) + btul.logging.debug(f"Settings: {self.settings}") + + try: + current_cycle = None + current_block = None + + while not self.should_exit.is_set(): + # Wait for next block to proceed + if not await scbs.wait_for_block(subtensor=self.subtensor): + continue + + # Get the current block + block = await self.subtensor.get_current_block() + btul.logging.info( + f"📦 Block #{block} detected", prefix=self.settings.logging_name + ) + + # Get the list of neurons + neurons = await self.database.get_neurons() + btul.logging.info( + f"# of neurons: {len(neurons)}", prefix=self.settings.logging_name + ) + + # Get the list of validators and miners + validators, miners = [], [] + for neuron in neurons: + items = validators if self._is_validator(neuron) else miners + items.append(neuron) + + btul.logging.info( + f"# of validators: {len(validators)}", + prefix=self.settings.logging_name, + ) + btul.logging.info( + f"# of miners: {len(miners)}", prefix=self.settings.logging_name + ) + + # Compute the ip occurences + ip_occurences = Counter([x.ip for x in miners]) + + # Get the list of country + countries = self._get_countries(miners) + btul.logging.debug( + f"# of country: {len(countries)}", + prefix=self.settings.logging_name, + ) + + # Compute the next cycle if needed + if current_cycle is None or current_block >= current_cycle.stop: + current_cycle = sccc.get_next_cycle( + settings=self.settings, + netuid=self.settings.netuid, + block=current_block, + countries=countries, + ) + btul.logging.debug( + f"Current cycle #{current_cycle.start} - #{current_cycle.stop}", + prefix=self.settings.logging_name, + ) + + # Get the next step and so the next country to challenge + schedule = await sccc.get_schedule( + substrate=self.subtensor.substrate, + settings=self.settings, + cycle=current_cycle, + validators=validators, + countries=countries, + hotkey=self.wallet.hotkey.ss58_address, + instance=1, # TODO: index of the challenger instance as we can have multiple of them + ) + btul.logging.debug( + f"Schedule: {[(x.country, x.block_start, x.block_end) for x in schedule]}", + prefix=self.settings.logging_name, + ) + + # Store the schedule + await self.database.add_schedule(schedule=schedule) + btul.logging.trace( + f"Schedule stored", + prefix=self.settings.logging_name, + ) + + # Compute the next step and the block where to start it + next_step, next_step_start = sccc.get_next_step( + settings=self.settings, + cycle=current_cycle, + block=current_block, + counter=countries, + ) + btul.logging.debug( + f"Waiting for step {next_step} to start at block: #{next_step_start}", + prefix=self.settings.logging_name, + ) + + # Wait the beginning of the step + await scbs.wait_for_block( + subtensor=self.subtensor, block=next_step_start + ) + + # Loop through the countries + for step in schedule: + # Get the step number + step_index = step.step_index + + if step_index < next_step: + # Skip until next step is reached + continue + + # Set the start time of the step + step_start_time = time.time() + + # Get the scheduled country + country = step.country + + # Display the starting block + btul.logging.info( + f"[{step_index}] Step starting at block {step.block_start} and finishing at block {step.block_end} for country {country}", + prefix=self.settings.logging_name, + ) + + # Get the list of challengers for the scheduled country + challengees: typing.List[cmm.Miner] = [ + m for m in miners if m.country == country + ] + + # Set the start time of the block we have to wait before challenging + wait_block_start = time.time() + + # Do not challenge if there are no challengers + challenge = None + if len(challengees) > 0: + # Wait next block to let miners proxy whitelisting the validator + btul.logging.debug( + "Waiting challengees proxy to be set up", + prefix=self.settings.logging_name, + ) + await scbs.wait_for_block(subtensor=self.subtensor) + wait_block_time = time.time() - wait_block_start + + # # Get the list of identities + # identities = await get_challengee_identities( + # subtensor=self.subtensor, + # netuid=self.config.netuid, + # ) + # btul.logging.debug( + # f"# of identities: {len(identities)}", + # prefix=self.settings.logging_name, + # ) + + # Challenge the challengers + challenge_start_time = time.time() + challenges_result, challenge = ( + await self._challenge_challengers( + step_index=step_index, + settings=self.settings, + ip_occurences=ip_occurences, + challengees=challengees, + identities=identities, + ) + ) + challenge_time = time.time() - challenge_start_time + btul.logging.debug( + f"[{step_index}] Challenge completed in {challenge_time} seconds", + prefix=self.settings.logging_name, + ) + + # Compute the scores + score_start_time = time.time() + await self._score_challengers( + step_index=step_index, + settings=self.settings, + challengees=challengees, + challenges_result=challenges_result, + ) + score_time = time.time() - score_start_time + btul.logging.debug( + f"[{step_index}] Scoring completed in {score_time} seconds", + prefix=self.settings.logging_name, + ) + else: + wait_block_time = time.time() - wait_block_start + + btul.logging.warning( + f"[{step_index}] No challenge executed as no challengers are available", + prefix=self.settings.logging_name, + ) + + # Display step time + step_time = time.time() - step_start_time + btul.logging.debug( + f"[{step_index}] Step finished in {step_time:.2f}s", + prefix=self.settings.logging_name, + ) + + # Store the challenge + await self.database.add_challenge( + schedule_id=step.id, + challenge=challenge, + process_time=step_time - wait_block_time, + ) + btul.logging.trace( + f"[{step_index}] Challenge {step.id} stored", + prefix=self.settings.logging_name, + ) + + # Notify analytic + # hotkeys = [x.hotkey for x in challengers] + # await self.database.notify_analytic( + # "challenge", + # schedule_id=step.id, + # hotkeys=",".join(hotkeys), + # ) + # btul.logging.trace( + # f"[{step_index}] Challenge {step.id} sent", + # prefix=self.settings.logging_name, + # ) + + btul.logging.debug( + f"[{step_index}] Waiting until block: #{step.block_end}", + prefix=self.settings.logging_name, + ) + + # Wait until the end of the step + await scbs.wait_for_block( + subtensor=self.subtensor, block=step.block_end + ) + + finally: + self.finished.set() + btul.logging.info( + "🛑 Challenger service exiting...", + prefix=self.settings.logging_name, + ) + + async def stop(self): + """ + Signals the challenger to stop and waits for the loop to exit cleanly. + """ + self.should_exit.set() + await self.finished.wait() + btul.logging.info( + f"✅ MetagraphObserver service stopped", prefix=self.settings.logging_name + ) + + async def _challenge_challengers( + self, + step_index: int, + settings: sccs.Settings, + ip_occurences: Counter, + challengees: typing.List[cmm.Miner], + identities: typing.Dict[str, typing.List], + ): + # Define the result + checks_result = {} + + # Define the available challengers + available_challengers = list(challengees) + + # Execute some checks + for challenger in challengees: + ip = challenger.ip + hotkey = challenger.hotkey + + # Execute some checks + for challenger in challengees: + ip = challenger.ip + hotkey = challenger.hotkey + + # Check the identity of the miners are set + identity = identities.get(hotkey) + if identity is None: + checks_result[hotkey] = ccm.ChallengeResult.create_failed( + reason="Identity is not set", + challenge_attempts=settings.default_challenge_max_iteration, + avg_process_time=settings.challenge_timeout, + ) + continue + + # Check if the ip is associated to more than 1 miner + if ip_occurences[ip] > 1: + checks_result[hotkey] = ccm.ChallengeResult.create_failed( + reason=f"{ip_occurences[ip]} miners associated with the ip {ip}", + challenge_attempts=settings.default_challenge_max_iteration, + avg_process_time=settings.challenge_timeout, + ) + continue + + # Check there are some challenges for the different node! + + # Filter out challengers that failed the different checks + available_challengers = list( + set(available_challengers) + - set([x for x in available_challengers if x.hotkey in checks_result]) + ) + + challenge = None + challenges_result = {} + if len(available_challengers) > 0: + # Execute challenge + challenges_result, challenge = await svccce.execute_challenge( + settings=settings, + subtensor=self.subtensor, + challengers=available_challengers, + nodes=identities, + ) + else: + btul.logging.warning( + f"[{step_index}] Skip the challenge as no challengers have nodes registered", + prefix=self.settings.logging_name, + ) + + # Build result + result = { + **checks_result, + **challenges_result, + } + + return result, challenge + + async def _score_challengers( + self, + step_index: int, + settings: sccs.Settings, + challengees: typing.List[cmm.Miner], + challenges_result: typing.Dict[str, ccm.ChallengeResult], + ): + # Compute the metadata + for challenger in challengees: + # Get the miner reason for the challenger + challenge_result: ccm.ChallengeResult = challenges_result.get(challenger.hotkey) + + # Get the score of the challenger + score: cm.Score = scores.get(challenger.hotkey) + score.success = challenge_result.is_successful + score.reason = challenge_result.reason + + # Display the hotkey + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Hotkey: {challenger.hotkey}" + ) + + if challenge_result.chain: + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Challenge - Chain: {challenge_result.chain}, Type: {challenge_result.type}" + ) + + # Check if the miner/subtensor are verified + if not score.success: + btul.logging.warning( + f"[{step_index}][{challenger.uid}] Challenge failed - {challenge_result.reason}", + prefix=self.settings.logging_name, + ) + + # Availability metadata + score.availability_attempts, score.availability_successes = ( + refresh_availability_metadata( + settings=settings, result=challenge_result, score=score + ) + ) + + # Reliability metadata + score.reliability_attempts, score.reliability_successes = ( + refresh_reliability_metadata( + settings=settings, result=challenge_result, score=score + ) + ) + + # Latency metadata + score.latency_times = refresh_latency_metadata( + settings=settings, result=challenge_result, score=score + ) + + # Performance metadata + ( + score.performance_attempts, + score.performance_successes, + score.performance_boost, + ) = refresh_performance_metadata( + settings=settings, result=challenge_result, score=score + ) + + # Compute Indivudal and Peers scores + # Individual score: score that is computed based on the miner itself + # Peers score: score that is computed based on the challenged miners + for challenger in challengees: + # Get the miner reason for the challenger + challenge_result: ccm.ChallengeResult = challenges_result.get(challenger.hotkey) + + # Get the score of the challenger + score: cm.Score = scores.get(challenger.hotkey) + + # Compute score for availability + score.availability_score = compute_availability_score(score=score) + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Availability score {score.availability_score}", + prefix=self.settings.logging_name, + ) + + # Compute score for reliability + score.reliability_score = compute_reliability_score(score=score) + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Reliability score {score.reliability_score}", + prefix=self.settings.logging_name, + ) + + # Compute score for latency + score.latency_score = compute_latency_score( + scores=scores, + challenger=challenger, + ) + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Latency score {score.latency_score}", + prefix=self.settings.logging_name, + ) + + # Compute score for performence + score.performance_score = compute_performance_score( + scores=scores, + challenger=challenger, + ) + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Performance score {score.performance_score}", + prefix=self.settings.logging_name, + ) + + # Computes Performance scores + # Performance score: score that is computed based on the x top miners + for challenger in challengees: + # Get the miner reason for the challenger + challenge_result: ccm.ChallengeResult = challenges_result.get(challenger.hotkey) + + # Get the score of the challenger + score: cm.Score = scores.get(challenger.hotkey) + + # Compute score for distribution + score.distribution_score = compute_distribution_score( + settings=settings, + challenger=challenger, + scores=scores, + ) + btul.logging.debug( + f"[{step_index}][{challenger.uid}] Distribution score {score.distribution_score}", + prefix=self.settings.logging_name, + ) + + # Compute final score + score.final_score = compute_final_score(settings, score) + btul.logging.info( + f"[{step_index}][{challenger.uid}] Final score {score.final_score}", + prefix=self.settings.logging_name, + ) + + # Compute the moving score + # TODO: Take into account the fact that the miner maybe flag! + score.moving_score = ( + settings.MOVING_SCORE_ALPHA * score.final_score + + (1 - settings.MOVING_SCORE_ALPHA) * score.moving_score + ) + btul.logging.info( + f"[{step_index}][{challenger.uid}] Moving score {score.moving_score}", + prefix=self.settings.logging_name, + ) + + + def _is_validator(neuron: scmn.Neuron): + return neuron.validator_trust > 0 and neuron.stake >= 1000 + + def _get_countries( + self, neurons: typing.List[scmn.Neuron] + ) -> typing.Dict[str, int]: + # Count how many times each value appears in the list + country_counter = Counter([x.country for x in neurons if x.country != ""]) + + # Build the list of countries + countries = sorted( + set( + [ + (x.country, country_counter[x.country]) + for x in neurons + if x.country in country_counter.keys() + ] + ) + ) + + return countries diff --git a/subvortex/validator/core/challenger/challenges/__init__.py b/subvortex/validator/core/challenger/challenges/__init__.py new file mode 100644 index 00000000..cf645d49 --- /dev/null +++ b/subvortex/validator/core/challenger/challenges/__init__.py @@ -0,0 +1,3 @@ +import subvortex.validator.core.challenger.challenges.bittensor.challenge_neuron_info as svccni + +CHALLENGES = {"bittensor": [(svccni.create_challenge, svccni.execute_challenge)]} diff --git a/subvortex/validator/core/challenger/challenges/bittensor/challenge_neuron_info.py b/subvortex/validator/core/challenger/challenges/bittensor/challenge_neuron_info.py new file mode 100644 index 00000000..bf986e55 --- /dev/null +++ b/subvortex/validator/core/challenger/challenges/bittensor/challenge_neuron_info.py @@ -0,0 +1,251 @@ +import json +import time +import random +import asyncio +import traceback +import websockets +from websockets.exceptions import ( + ConnectionClosed, + InvalidURI, + InvalidHandshake, + WebSocketException, +) + +import bittensor.core.async_subtensor as btcas +import bittensor.utils.btlogging as btul + +import subvortex.validator.core.challenger.constants as ccc +import subvortex.validator.core.challenger.model as ccm +import subvortex.validator.core.challenger.settings as ccs + + +async def create_challenge(settings: ccs.Settings, node_type: str, **kargs) -> tuple: + subtensor = kargs.get("subtensor") + + if node_type == "lite": + return await _create_lite_challenge(settings=settings, subtensor=subtensor) + else: + btul.logging.warning(f"Node type {node_type} is not implemented") + + +async def execute_challenge( + settings: ccs.Settings, ip: str, port: int, challenge: tuple +) -> ccm.TaskResult: + # Unpacking challenge + try: + params, block_hash, value = challenge + except ValueError as e: + return (False, f"Invalid challenge format: {e}", settings.challenge_timeout) + btul.logging.debug(f"Challenge {params} / {block_hash} / {value}") + + # Set IP/Port + ws_url = ( + f"wss://{ip}:{port}" + if ip == "archive.chain.opentensor.ai" + else f"ws://{ip}:{port}" + ) + btul.logging.debug(f"Challenging {ws_url}") + + # Set start time + start_time = time.time() + + ws = None + try: + # Attempt to establish WebSocket connection + ws = await websockets.connect(ws_url) + except asyncio.TimeoutError: + return ccm.TaskResult.create(reason="WebSocket connection timed out") + except (InvalidURI, OSError): + return ccm.TaskResult.create(reason=f"Invalid WebSocket URI: {ws_url}") + except InvalidHandshake as e: + return ccm.TaskResult.create(reason=f"WebSocket handshake failed: {e}") + except WebSocketException as e: + return ccm.TaskResult.create(reason=f"WebSocket error: {e}") + + # Prepare data payload + data = json.dumps( + { + "id": "state_call0", + "jsonrpc": "2.0", + "method": "state_call", + "params": [ + "NeuronInfoRuntimeApi_get_neuron_lite", + params, + block_hash, + ], + } + ) + + try: + # Send request + await ws.send(data) + except ConnectionClosed as e: + return ccm.TaskResult.create( + reason=f"WebSocket closed before sending data: {e}" + ) + + try: + # Receive response + response = await ws.recv() + except asyncio.TimeoutError: + return ccm.TaskResult.create(reason=f"WebSocket receive timed out") + except ConnectionClosed as e: + return ccm.TaskResult.create( + reason=f"WebSocket closed before receiving response: {e}" + ) + + # Calculate process time + process_time = time.time() - start_time + + # Load the response + try: + response = json.loads(response) + except json.JSONDecodeError: + return ccm.TaskResult.create( + is_available=True, reason=f"Received malformed JSON response" + ) + + # Validate response + if "error" in response: + return ccm.TaskResult.create( + is_available=True, + reason=f"Error in response: {response['error'].get('message', 'Unknown error')}", + ) + + if "result" not in response: + return ccm.TaskResult.create( + is_available=True, + reason=f"Response does not contain a 'result' field", + ) + + if response["result"] != value: + return ccm.TaskResult.create( + is_available=True, + reason="Received incorrect result", + process_time=process_time, + ) + + # Ensure WebSocket is closed properly + if ws: + await ws.close() + + return ccm.TaskResult.create( + is_available=True, is_reliable=True, process_time=process_time + ) + + +async def _create_lite_challenge( + settings: ccs.Settings, subtensor: btcas.AsyncSubtensor +) -> tuple: + try: + # Get the current block from the miner subtensor + current_block = await subtensor.get_current_block() + + # Select a block between [current block - 256, current block - 10] + block = random.randint( + current_block - ccc.LITE_NODE_BLOCK_LOWER_LIMIT, + current_block - ccc.LITE_NODE_BLOCK_UPPER_LIMIT, + ) + btul.logging.debug( + f"[Lite] Block chosen: {block}", prefix=settings.logging_name + ) + + # Get the hash of the choosen block + block_hash = await subtensor.get_block_hash(block) + btul.logging.debug( + f"[Lite] Block hash chosen: {block_hash}", prefix=settings.logging_name + ) + + # Get the number of subnets + subnet_count = await subtensor.get_total_subnets(block=block) - 1 + btul.logging.debug( + f"[Lite] # of subnet: {subnet_count}", prefix=settings.logging_name + ) + + # Be sure we select a subnet that at least one neuron + subnet_to_exclude = [] + subnet_uid = None + neuron_count = 0 + while neuron_count == 0: + if subnet_uid is not None: + subnet_to_exclude.append(subnet_uid) + + # Select a subnet + subnet_uid = random.choice( + list(set(range(subnet_count + 1)) - set(subnet_to_exclude)) + ) + + # Get the total number of neurons + neuron_count = await subtensor.subnetwork_n(subnet_uid, block) + + # Select a neuron + neuron_uid = random.randint(0, neuron_count - 1) + + # Get the runtime call definition + runtime_call_def = _get_runtime_call_definition( + substrate=subtensor.substrate, + api="NeuronInfoRuntimeApi", + method="get_neuron_lite", + ) + + # Encode the parameters + params = _new_encode( + substrate=subtensor.substrate, + runtime_call_def=runtime_call_def, + params=[subnet_uid, neuron_uid], + ) + params = params.hex() + + # Send the RPC request + response = await subtensor.substrate.rpc_request( + "state_call", + [f"NeuronInfoRuntimeApi_get_neuron_lite", params, block_hash], + ) + + # Get the value + value = response.get("result") + + btul.logging.debug( + f"[Lite] Challenge created - Block: {block}, Netuid: {subnet_uid}, Uid: {neuron_uid}", + prefix=settings.logging_name, + ) + + return (params, block_hash, value) + except Exception as err: + btul.logging.warning( + f"[Lite] Could not create the challenge: {err}", + prefix=settings.logging_name, + ) + btul.logging.warning(traceback.format_exc(), prefix=settings.logging_name) + return None + + +def _get_runtime_call_definition( + substrate: btcas.AsyncSubstrateInterface, api: str, method: str +): + try: + metadata_v15_value = substrate.runtime.metadata_v15.value() + + apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} + api_entry = apis[api] + methods = {entry["name"]: entry for entry in api_entry["methods"]} + return methods[method] + except KeyError: + raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") + + +def _new_encode(substrate: btcas.AsyncSubstrateInterface, runtime_call_def, params={}): + param_data = b"" + + for idx, param in enumerate(runtime_call_def["inputs"]): + param_type_string = f"scale_info::{param['ty']}" + if isinstance(params, list): + param_data += substrate._encode_scale(param_type_string, params[idx]) + else: + if param["name"] not in params: + raise ValueError(f"Runtime Call param '{param['name']}' is missing") + + param_data += substrate._encode_scale( + param_type_string, params[param["name"]] + ) + return param_data diff --git a/subvortex/validator/core/challenger/challenges/executor.py b/subvortex/validator/core/challenger/challenges/executor.py new file mode 100644 index 00000000..58b710a8 --- /dev/null +++ b/subvortex/validator/core/challenger/challenges/executor.py @@ -0,0 +1,205 @@ +import typing +import random +import asyncio +from itertools import chain + +import bittensor.core.async_subtensor as btcas +import bittensor.utils.btlogging as btul + +import subvortex.validator.core.model.miner as cmm +import subvortex.validator.core.challenger.settings as ccs +import subvortex.validator.core.challenger.challenges as ccc +import subvortex.validator.core.challenger.model as ccm + + +async def execute_challenge( + config, + settings: ccs.Settings, + subtensor: btcas.AsyncSubtensor, + challengers: typing.List[cmm.Miner], + nodes: typing.Dict[str, typing.List], +): + tasks = [] + + # Get the nodes of all the challengers + challengers_nodes = list( + chain.from_iterable([nodes[x.hotkey] for x in challengers]) + ) + + # Get all the unique chain of the nodes + node_chains = list(set([x.get("chain") for x in challengers_nodes])) + btul.logging.debug(f"# of chains: {len(node_chains)}", prefix=settings.logging_name) + + # Select the chain node to challenge + node_chain_selected = random.choice(node_chains) + btul.logging.debug( + f"Chain selected: {node_chain_selected}", prefix=settings.logging_name + ) + + # Get all the unique chain of the nodes + node_types = list( + set( + [ + x.get("type") + for x in challengers_nodes + if x.get("chain") == node_chain_selected + ] + ) + ) + btul.logging.debug( + f"# of chain types: {len(node_types)}", prefix=settings.logging_name + ) + + # Select the type node to challenge + # TODO: once we have added the type the schedule, take it instead of taking randomly! + node_type_selected = random.choice(node_types) + btul.logging.debug( + f"Chain type: {node_type_selected}", prefix=settings.logging_name + ) + + # List of nodes of the selected chains + nodes_selected = { + hotkey: [ + node + for node in nodes + if node["chain"] == node_chain_selected + and node["type"] == node_type_selected + ] + for hotkey, nodes in nodes.items() + if any(node["chain"] == node_chain_selected for node in nodes) + } + btul.logging.debug(f"Node selected: {nodes_selected}", prefix=settings.logging_name) + + # Get the max connections across all the nodes selected + max_connection = max( + [ + node.get("max-connection", 0) + for node_list in nodes_selected.values() + for node in node_list + ] + + [settings.default_challenge_max_iteration] + ) + btul.logging.debug( + f"Max connection: {max_connection}", prefix=settings.logging_name + ) + + # Get the list of available challenges for the chain + chain_challenges = ccc.CHALLENGES.get(node_chain_selected) + + # Choose a challenge randomly + create_challenge, execute_challenge = random.choice(chain_challenges) + + # Create the challenge + btul.logging.info(f"Creating challenge", prefix=settings.logging_name) + challenge = await create_challenge( + config=config, node_type=node_type_selected, subtensor=subtensor + ) + btul.logging.debug(f"Challenge created: {challenge}", prefix=settings.logging_name) + + btul.logging.info(f"Executing challenge", prefix=settings.logging_name) + for i in range(max_connection): + for challenger in challengers: + challenger_nodes = list(nodes_selected.get(challenger.hotkey, [])) + for node in challenger_nodes: + # Check if the node has less connection than the current iteration + max_node_connection = max( + node.get("max-connection", 0), + settings.default_challenge_max_iteration, + ) + + if i >= max_node_connection: + # Reached the max connection for that node + continue + + task = asyncio.create_task( + execute_challenge( + ip=challenger.ip, + port=node.get("port"), + challenge=challenge, + ), + name=f"{node.get('chain')}-{node.get('type')}-{challenger.hotkey}-{i + 1}", + ) + tasks.append(task) + + # Wait until finished or timedout + done, pending = ( + await asyncio.wait(tasks, timeout=settings.challenge_timeout) + if len(tasks) > 0 + else (set(), set()) + ) + + # Cancel any remaining tasks that didn't finished on time + for task in pending: + task.cancel() + + btul.logging.debug(f"Executing executed", prefix=settings.logging_name) + + # Build the result + result = _aggregate_results(done | pending) + + return result, challenge + + +def _aggregate_results(tasks): + challenges_result: typing.Dict[str, ccm.ChallengeResult] = {} + + for task in tasks: + # Get the task details + task_details = task.get_name().split("-") + + # Get the ip of the task + chain = task_details[0] + type = task_details[1] + hotkey = task_details[2] + + # Get the challenge result + challenge_result = challenges_result.get(hotkey) + if challenge_result is None: + challenge_result = challenges_result[hotkey] = ( + # Set is_reliable True because the node will be consider reliable if all its tasks are available/reliable + ccm.ChallengeResult.create_default( + chain=chain, type=type, is_reliable=True, + ) + ) + + # Increment the number of attempts + challenge_result.challenge_attempts += 1 + + if task.cancelled() or not task.done(): + continue + + # Get the challenge result + task_result: ccm.TaskResult = task.result() + + # Will be true if at least one task is available + challenge_result.is_available |= task_result.is_available + + # Will be true if all tasks are reliable + challenge_result.is_reliable &= ( + task_result.is_available and task_result.is_reliable + ) + + # Increment the number of successes + challenge_result.challenge_successes += int(challenge_result.is_reliable) + + # Join unique reason + challenge_result.reason = ( + ",".join( + set(challenge_result.reason.split(",") + [task_result.reason.strip()]) + ) + ).strip() + + # Compute the average process time + challenge_result.avg_process_time = ( + challenge_result.avg_process_time + task_result.process_time + ) / 2 + + for result in challenges_result.values(): + if ( + result.reason == "" + and result.challenge_attempts != result.challenge_successes + ): + result.reason = f"{result.challenge_attempts - result.challenge_successes} task(s) have been cancelled or not completed" + continue + + return challenges_result diff --git a/subvortex/validator/core/challenger/constants.py b/subvortex/validator/core/challenger/constants.py new file mode 100644 index 00000000..c10f98aa --- /dev/null +++ b/subvortex/validator/core/challenger/constants.py @@ -0,0 +1,9 @@ +# Number of seconds to build a block +BLOCK_BUILD_TIME = 12 + +# After this many blocks pass, a block can be considered final +DYNAMIC_BLOCK_FINALIZATION_NUMBER = 3 + +# Number of historic blocks a lite node has (300 blocks in total) +LITE_NODE_BLOCK_UPPER_LIMIT = 10 +LITE_NODE_BLOCK_LOWER_LIMIT = 250 \ No newline at end of file diff --git a/subvortex/validator/core/challenger/database.py b/subvortex/validator/core/challenger/database.py new file mode 100644 index 00000000..497d8146 --- /dev/null +++ b/subvortex/validator/core/challenger/database.py @@ -0,0 +1,111 @@ +import traceback + +import bittensor.utils.btlogging as btul + +from subvortex.core.database.database_utils import decode_hash, decode_value +from subvortex.core.metagraph.database import NeuronReadOnlyDatabase +from subvortex.validator.core.model.schedule import ( + Schedule, + ScheduleModel210, +) +from subvortex.validator.core.model.challenge import ( + Challenge, + ChallengeModel210, +) + + +class ChallengerDatabase(NeuronReadOnlyDatabase): + + def __init__(self, settings): + super().__init__(settings=settings) + + # Register neuron models keyed by their version + self.models["schedule"] = {x.version: x for x in [ScheduleModel210()]} + self.models["challenge"] = {x.version: x for x in [ChallengeModel210()]} + + async def get_schedule(self, hotkey: str) -> Schedule: + # Ensure the connection is up and running + await self.ensure_connection() + + # Get the active versions + _, active = await self._get_migration_status("schedule") + + for version in reversed(active): + model = self.models["schedule"][version] + if not model: + continue + + try: + # Attempt to read the neuron using the model + neuron = await model.read(self.database, hotkey) + return neuron + + except Exception as ex: + btul.logging.warning( + f"[get_schedule] Failed to read schedule for hotkey='{hotkey}' using version={version}: {ex}", + prefix=self.settings.logging_name, + ) + btul.logging.debug( + f"[get_schedule] Exception type: {type(ex).__name__}, Traceback:\n{traceback.format_exc()}", + prefix=self.settings.logging_name, + ) + + return None + + async def add_schedule(self, schedule: Schedule): + # Ensure the connection is up and running + await self.ensure_connection() + + # Get the active versions + _, active = await self._get_migration_status("miner") + + for version in active: + model = self.models["schedule"].get(version) + if not model: + continue + + try: + await model.write(self.database, schedule) + + except Exception as ex: + btul.logging.error( + f"[{version}] Add schedule failed for {schedule.hotkey}: {ex}", + prefix=self.settings.logging_name, + ) + btul.logging.debug( + f"[add_schedule] Exception type: {type(ex).__name__}, Traceback:\n{traceback.format_exc()}", + prefix=self.settings.logging_name, + ) + + return None + + async def add_challenge(self, challenge: Challenge): + # Ensure the connection is up and running + await self.ensure_connection() + + # Get the active versions + _, active = await self._get_migration_status("miner") + + for version in active: + model = self.models["challenge"].get(version) + if not model: + continue + + try: + await model.write(self.database, challenge) + + except Exception as ex: + btul.logging.error( + f"[{version}] Add challenge failed for {challenge.hotkey}: {ex}", + prefix=self.settings.logging_name, + ) + btul.logging.debug( + f"[add_challnge] Exception type: {type(ex).__name__}, Traceback:\n{traceback.format_exc()}", + prefix=self.settings.logging_name, + ) + + return None + + def _key(self, key: str): + # Prefixes keys to namespace them under this service + return f"{self.settings.key_prefix}:{key}" diff --git a/subvortex/validator/core/challenger/model.py b/subvortex/validator/core/challenger/model.py new file mode 100644 index 00000000..0e330b0f --- /dev/null +++ b/subvortex/validator/core/challenger/model.py @@ -0,0 +1,99 @@ +from dataclasses import dataclass + + +@dataclass +class TaskResult: + # True if the node can answer at least one request, false otherwise + is_available: bool + + # True if the node's answer correctly, false otherwise + is_reliable: bool + + # Reason of the failure of the challenge (is_availble and/or is_reliable is false) + reason: str + + # List of the time took for each request of the challenge + process_time: float + + @staticmethod + def create( + is_available: bool, + is_reliable: bool, + reason: str, + process_time: int, + ): + return TaskResult( + is_available=is_available, + is_reliable=is_reliable, + reason=reason, + process_time=process_time, + ) + + +@dataclass +class ChallengeResult: + # Name of the blockchain of the node + chain: str + + # Name of the type of the node + type: str + + # True if the node can answer at least one request, false otherwise + is_available: bool + + # True if the node's answer correctly, false otherwise + is_reliable: bool + + # Reason of the failure of the challenge (is_available and/or is_reliable is false) + reason: str + + # Number of attempts to challenge the node + challenge_attempts: int + + # Number of success to challenge the node + challenge_successes: int + + # Average time of all the successful challenge + avg_process_time: float + + @property + def is_successful(self): + """ + True if the node is available and reliable, false otherwise + """ + return self.is_available and self.is_reliable + + @staticmethod + def create_default( + chain=None, + type=None, + is_available=False, + is_reliable=False, + reason="", + challenge_attempts=0, + challenge_successes=0, + avg_process_time=0, + ): + return ChallengeResult( + chain=chain, + type=type, + is_available=is_available, + is_reliable=is_reliable, + reason=reason, + challenge_attempts=challenge_attempts, + challenge_successes=challenge_successes, + avg_process_time=avg_process_time, + ) + + @staticmethod + def create_failed(reason: str, challenge_attempts: int, avg_process_time: float): + return ChallengeResult( + chain=None, + type=None, + is_available=False, + is_reliable=False, + reason=reason, + challenge_attempts=challenge_attempts, + challenge_successes=0, + avg_process_time=avg_process_time, + ) diff --git a/subvortex/validator/core/challenger/scheduler.py b/subvortex/validator/core/challenger/scheduler.py new file mode 100644 index 00000000..107bd929 --- /dev/null +++ b/subvortex/validator/core/challenger/scheduler.py @@ -0,0 +1,191 @@ +import math +import random +import typing +import hashlib + +import bittensor.core.async_subtensor as btcas +import bittensor.utils.btlogging as bul + +import subvortex.core.model.neuron as scmn +import subvortex.validator.core.model.schedule as scms +import subvortex.validator.core.challenger.settings as sccs +import subvortex.validator.core.challenger.constants as sccc + +async def get_schedules( + substrate: btcas.AsyncSubstrateInterface, + settings: sccs.Settings, + cycle: range, + validators: typing.List[scmn.Neuron], + countries: typing.Dict[str, int], +) -> typing.Dict[str, typing.List[scms.Schedule]]: + schedules = {} + + # Sort validators by stake + # TODO: do we want to allowed more validators based on the number of nodes to test? + validators = sorted(validators, key=lambda x: x.total_stake) + + # Take the best validators to cover the list of country + validators = validators[: len(countries)] + bul.logging.debug(f"# of challengers: {len(validators)}") + + for validator in validators: + # Compute the miners selection the validator will have to challenge + schedule = await get_schedule( + substrate=substrate, + settings=settings, + cycle=cycle, + validators=validators, + countries=countries, + hotkey=validator.hotkey, + instance=0, + ) + + # Store the selection for the validator + schedules[validator.uid] = schedule + + return schedules + + +async def get_schedule( + substrate: btcas.AsyncSubstrateInterface, + settings: sccs.Settings, + cycle: range, + validators: typing.List[scmn.Neuron], + countries: typing.Dict[str, int], + hotkey: str, + instance: int, +) -> typing.List[scms.Schedule]: + # Get the hash of the block + block_hash = await substrate.get_block_hash( + cycle.start - sccc.DYNAMIC_BLOCK_FINALIZATION_NUMBER + ) + + # Use the block hash as a seed + seed = _seed(block_hash) + + # Create random instance + rm = random.Random(seed) + + # Shuffle the validators hotkey + rcountries = list(countries) + rm.shuffle(rcountries) + + # Get the validator hotkeys + ordered_hotkeys = [v.hotkey for v in validators] + + # Get the index of the current hotkey + index = ordered_hotkeys.index(hotkey) if hotkey in ordered_hotkeys else -1 + if index == -1: + raise Exception( + f"Validator is not part of the 64 validators. Please stake more." + ) + + # Set the step details + step_start = cycle.start + + # Compute the number of block needed for this step + step_blocks = get_step_blocks(settings=settings, counter=dict(rcountries)) + + steps = [] + while len(steps) < len(rcountries): + # Get the country + country = rcountries[(index + instance) % len(rcountries)][0] + + # Compute the end of the step + step_end = step_start + step_blocks + + # Create the new schedule + schedule = scms.Schedule.create( + index=index, + instance=instance, + cycle_start=cycle.start, + cycle_end=cycle.stop, + block_start=step_start, + block_end=step_end, + country=country, + ) + + # Add the new schedules + steps.append(schedule) + + # The end of the previous step becomes the start of the next one + step_start = step_end + + index += 1 + + return steps + + +def get_next_cycle( + settings: sccs.Settings, netuid: int, block: int, countries: typing.Dict[str, int] +) -> range: + """ + Compute the next cycle from the block + """ + # Create a counter + countries = dict(countries) + + # Get the number of blocks needed for a cycle + cycle_blocks = len(countries) * get_step_blocks(settings=settings, counter=countries) + + # Get the cycle of the current block + cycle = get_epoch_containing_block( + block=block, netuid=netuid, tempo=cycle_blocks, adjust=0 + ) + + return cycle + + +def get_next_step( + settings: sccs.Settings, cycle: range, block: int, counter: typing.Dict[str, int] +): + # Compute the number of blocks for a step + step_blocks = get_step_blocks(settings=settings, counter=dict(counter)) + + # Compute the number of steps since the cycle started (as a float) + steps_since_start = math.ceil((block - cycle.start) / step_blocks) + + # Compute the next step + next_step = steps_since_start + 1 + + # Compute the next step + next_step_start = cycle.start + steps_since_start * step_blocks + + return next_step, next_step_start + + +def get_step_blocks(settings: sccs.Settings, counter: typing.Dict[str, int]): + """ + Compute the number of blocks to execute the longest step across all challenger + """ + if len(counter) == 0: + return 0 + + # Compute the max challengee in the counter + max_occurence = max([x for x in counter.values()]) + + # Compute the total time to challenge all the neurons + total_time = max_occurence * settings.max_challenge_time_per_miner + + return math.ceil(total_time / sccc.BLOCK_BUILD_TIME) + 1 + + +def get_epoch_containing_block( + block: int, netuid: int, tempo: int = 360, adjust: int = 1 +) -> range: + """ + Get the current epoch of the block + """ + assert tempo > 0 + + interval = tempo + adjust + last_epoch = block - adjust - (block + netuid + adjust) % interval + next_tempo_block_start = last_epoch + interval + return range(last_epoch, next_tempo_block_start) + + +def _seed(value): + """Convert a large integer to a distinct 64-bit seed using SHA-256 and XOR folding.""" + hashed = hashlib.sha256(str(value).encode()).digest() + seed = int.from_bytes(hashed[:8], "big") ^ int.from_bytes(hashed[8:16], "big") + return seed diff --git a/subvortex/validator/core/challenger/score.py b/subvortex/validator/core/challenger/score.py new file mode 100644 index 00000000..df2bac12 --- /dev/null +++ b/subvortex/validator/core/challenger/score.py @@ -0,0 +1,330 @@ +# The MIT License (MIT) +# Copyright © 2025 Eclipse Vortex + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. +import numpy as np +from typing import Dict + +import bittensor.utils.btlogging as btul + +import subvortex.validator.core.challenger.settings as sccs +import subvortex.validator.core.challenger.model as ccm +import subvortex.validator.core.challenger.utils as ccu + +# Longest distance between any two places on Earth is 20,010 kilometers +MAX_DISTANCE = 20010 + +# Decay rate parameter controlling the distribution score decrease. +BASE_SCORE = 1 +DECAY_RATE = 0.5 + + +def refresh_availability_metadata( + settings: sccs.Settings, result: ccm.ChallengeResult, score: Score +): + attempts = ccu.apply_sma(settings, score.availability_attempts, 1, 0) + btul.logging.trace( + f"[{score.uid}][Score][Availability] # of attempts {score.availability_attempts}", + prefix=settings.logging_name, + ) + + successes = ccu.apply_sma( + settings, + score.availability_successes, + int(result.is_available), + 0, + ) + btul.logging.trace( + f"[{score.uid}][Score][Availability] # of successes {score.availability_successes}", + prefix=settings.logging_name, + ) + + return attempts, successes + + +def compute_availability_score(score: Score): + """ + Compute the availability score of the uid + """ + # Compute the attempts/success + attempts = sum(score.availability_attempts) + + successes = sum(score.availability_successes) + + # Normalization + score = successes / attempts if attempts > 0 else 0 + + return score + + +def refresh_reliability_metadata( + settings: sccs.Settings, result: ccm.ChallengeResult, score: Score +): + attempts = ccu.apply_sma(settings, score.reliability_attempts, 1, 0) + btul.logging.trace( + f"[{score.uid}][Score][Reliability] # of attemmpts {score.reliability_attempts}", + prefix=settings.logging_name, + ) + + successes = ccu.apply_sma( + settings, + score.reliability_successes, + int(result.is_reliable), + 0, + ) + btul.logging.trace( + f"[{score.uid}][Score][Reliability] # of successes {score.reliability_successes}", + prefix=settings.logging_name, + ) + + return attempts, successes + + +def compute_reliability_score(score: Score): + """ + Compute the reliability score of the uid + """ + # Compute the attempts/success + attempts = sum(score.reliability_attempts) + + successes = sum(score.reliability_successes) + + # Normalization + score = successes / attempts + + return score + + +def refresh_latency_metadata( + settings: sccs.Settings, result: ccm.ChallengeResult, score: Score +): + latency_times = ccu.apply_sma( + settings, + score.latency_times, + result.avg_process_time, + settings.CHALLENGE_TIMEOUT, + ) + btul.logging.trace( + f"[{score.uid}][Score][Latency] Process times {latency_times}", + prefix=settings.logging_name, + ) + return latency_times + + +# TODO: Put the score at 0 if the minimum attempts is not all successful! or find way to not get 1 when you are alone! +def compute_latency_score( + settings: sccs.Settings, + scores: Dict[str, Score], + challenger: Neuron, +): + """ + Compute the latency score of the uid + """ + # Get the challenger score + score = scores.get(challenger.hotkey) + + # Compute the average latency for each challenger + latencies = [(x.hotkey, np.mean(x.latency_times).item()) for x in scores.values()] + + # Sort the challenger SMA in descending way + latencies_sorted = sorted(latencies, key=lambda x: x[1]) + + # Find the rank of the challenger + challenger_rank = next( + (i for i, x in enumerate(latencies_sorted) if x[0] == challenger.hotkey), -1 + ) + if challenger_rank == -1: + return LATENCY_FAILURE_REWARD + + btul.logging.info(f"Rank: {challenger_rank}", prefix=settings.logging_name) + + # Calculate the adjustment factor + adjustment_factor = (TOP_X_MINERS - len(scores)) / TOP_X_MINERS + + # Set the sign modifier based on the index + sign_modifier = -1 if challenger_rank == 0 else 1 + + # Override scores if there are less miners than top x miners + score = ( + compute_score(challenger_rank) + if len(scores) >= TOP_X_MINERS + else compute_score(challenger_rank) + + ( + sign_modifier + * adjustment_factor + * (compute_score(challenger_rank) - compute_score(challenger_rank + 1)) + ) + ) + + return score + + +def refresh_performance_metadata( + settings: sccs.Settings, result: ccm.ChallengeResult, score: Score +): + attempts = ccu.apply_sma( + settings, score.performance_attempts, result.challenge_attempts, 0 + ) + btul.logging.trace( + f"[{score.uid}][Score][Performance] # of attempts {score.performance_attempts}", + prefix=settings.logging_name, + ) + + successes = ccu.apply_sma( + settings, + score.performance_successes, + result.challenge_successes, + 0, + ) + btul.logging.trace( + f"[{score.uid}][Score][Performance] # of successes {score.performance_successes}", + prefix=settings.logging_name, + ) + + # Compute an boost for challengee request more attempts than teh minimum + total_attempts = sum(attempts) + total_successes = sum(successes) + success_ratio = (total_successes / total_attempts) if total_attempts > 0 else 0 + + # Apply a non-linear boost for challengers attempting more than PERFORMANCE_MIN_ATTEMPTS + attempt_boost = ( + total_attempts / DEFAULT_CHALLENGE_MAX_ITERATION + ) ** settings.PERFORMANCE_REWARD_EXPONENT + + # Penalize challengers who take more attempts but fail more + penalty = 1 - (1 - success_ratio) * settings.PERFORMANCE_PENALTY_FACTOR + boost = success_ratio * attempt_boost * penalty + + boosts = ccu.apply_sma(settings, score.performance_boost, boost, 0) + + return attempts, successes, boosts + + +# TODO: Put the score at 0 if the minimum attempts is not all successful! or find way to not get 1 when you are alone! +def compute_performance_score( + scores: Dict[str, Score], + challenger: Neuron, +): + """ + Compute the performance score of the uid + """ + # Get the challenger score + score = scores.get(challenger.hotkey) + + # Build the array of hotkey/boots + boosts = [(x.hotkey, np.mean(x.performance_boost).item()) for x in scores.values()] + + # Sort challengers by performance score in descending order + performance_scores_sorted = sorted(boosts, key=lambda x: x[1], reverse=True) + + # Find the rank of the challenger + challenger_rank = next( + ( + i + for i, x in enumerate(performance_scores_sorted) + if x[0] == challenger.hotkey + ), + -1, + ) + if challenger_rank == -1: + return PERFORMANCE_FAILURE_REWARD + + # Calculate the adjustment factor + adjustment_factor = (TOP_X_MINERS - len(scores)) / TOP_X_MINERS + + # Set the sign modifier based on the index + sign_modifier = -1 if challenger_rank == 0 else 1 + + # Override scores if there are less miners than top x miners + score = ( + compute_score(challenger_rank) + if len(scores) >= TOP_X_MINERS + else compute_score(challenger_rank) + + ( + sign_modifier + * adjustment_factor + * (compute_score(challenger_rank) - compute_score(challenger_rank + 1)) + ) + ) + + return score + + +def compute_distribution_score( + settings: sccs.Settings, + challenger: Neuron, + scores: Dict[str, Score], +): + """ + Compute the distribution score of the challenger + Only the top x miner will receive a score, the rest will receive 0 + """ + # Sort the challengers per score - override distribution score to make it obsolete + sorted_challengers = sorted( + scores.values(), + key=lambda x: compute_final_score(settings, x, {"distribution": 1}), + reverse=True, + ) + + # Take the top X miners + top_challenger = sorted_challengers[:TOP_X_MINERS] + + # Find the inder of the miner + challenger_rank = next( + (i for i, x in enumerate(top_challenger) if x.hotkey == challenger.hotkey), -1 + ) + if challenger_rank == -1: + return DISTRIBUTION_FAILURE_REWARD + + # Compute the score using a exponential decay formula + # which is used in scoring systems to assign diminishing values to items based on their rank + score = compute_score(challenger_rank) + + return score + + +def compute_final_score(settings: sccs.Settings, score: Score, overrides={}): + """ + Compute the final score based on the different scores (availability, reliability, latency and distribution) + """ + availability_score = overrides.get("availability", score.availability_score) + latency_score = overrides.get("latency", score.latency_score) + reliability_score = overrides.get("reliability", score.reliability_score) + performance_score = overrides.get("performance", score.performance_score) + distribution_score = overrides.get("distribution", score.distribution_score) + + numerator = ( + (settings.AVAILABILITY_WEIGHT * availability_score) + + (settings.LATENCY_WEIGHT * latency_score) + + (settings.RELIABILITY_WEIGHT * reliability_score) + + (settings.DISTRIBUTION_WEIGHT * distribution_score) + + (settings.PERFORMANCE_WEIGHT * performance_score) + ) + + denominator = ( + settings.AVAILABILITY_WEIGHT + + settings.LATENCY_WEIGHT + + settings.RELIABILITY_WEIGHT + + settings.DISTRIBUTION_WEIGHT + + settings.PERFORMANCE_WEIGHT + ) + + score = numerator / denominator if denominator != 0 else 0 + + return score + + +def compute_score(index): + return BASE_SCORE * np.exp(-DECAY_RATE * index).item() diff --git a/subvortex/validator/core/challenger/settings.py b/subvortex/validator/core/challenger/settings.py new file mode 100644 index 00000000..a6c49e5e --- /dev/null +++ b/subvortex/validator/core/challenger/settings.py @@ -0,0 +1,80 @@ +from dataclasses import dataclass, field + +import subvortex.core.settings_utils as scsu + + +@dataclass +class Settings: + logging_name: str = field(default="Challenger", metadata={"readonly": True}) + """ + Prefix to use when logging + """ + + netuid: int = 7 + """ + UID of the subnet + """ + + default_challenge_max_iteration = 1 # TODO: Resotre 64 when going to prod + """ + Number of connection that can be opened to a node at the same time. + """ + + challenge_timeout: int = 5 + """ + Duration in second of the challenge + """ + + max_challenge_time_per_miner: int = 2 + """ + Maximum time to challenge a miner + """ + + challenge_period: int = 50 + """ + Period to take into account to compute the sma score + """ + + moving_score_alpha: float = 0.5 + """ + Factor controlling the weight of recent score in a moving average calculation. + """ + + availability_weight: int = 8 + """ + Weight to use for the availability score in the conputation of the final score + """ + + reliability_weight: int = 3 + """ + Weight to use for the reliability score in the conputation of the final score + """ + + latency_weight: int = 7 + """ + Weight to use for the latency score in the conputation of the final score + """ + + performance_weight: int = 7 + """ + Weight to use for the performance score in the conputation of the final score + """ + + distribution_weight: int = 2 + """ + Weight to use for the distribution score in the conputation of the final score + """ + + performance_reward_exponent: float = 0.7 + """ + Controls the non-linear scaling of rewards based on the number of challenge attempts. Higher values amplify rewards for handling more attempts. + """ + + performance_penalty_factor: float = 0.7 + """ + Reduces the score for challengers with a low success ratio, penalizing excessive failed attempts. + """ + + @classmethod + def create(cls) -> "Settings": + return scsu.create_settings_instance(cls) diff --git a/subvortex/validator/core/challenger/utils.py b/subvortex/validator/core/challenger/utils.py new file mode 100644 index 00000000..269d6072 --- /dev/null +++ b/subvortex/validator/core/challenger/utils.py @@ -0,0 +1,28 @@ +# The MIT License (MIT) +# Copyright © 2055 Eclipse Vortex + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. +import subvortex.validator.core.challenger.settings as sccs + + +def apply_sma( + settings: sccs.Settings, + scores: list, + score: int | float, + default_score: int | float, +): + return ([default_score] * max(0, settings.challenge_period - len(scores)) + scores)[ + -settings.challenge_period + 1 : + ] + [score] diff --git a/subvortex/validator/core/model/challenge/__init__.py b/subvortex/validator/core/model/challenge/__init__.py new file mode 100644 index 00000000..d4d4db40 --- /dev/null +++ b/subvortex/validator/core/model/challenge/__init__.py @@ -0,0 +1,7 @@ +from .challenge import Challenge +from .model_challenge_210 import ChallengeModel as ChallengeModel210 + +__all__ = [ + "Challenge", + "ChallengeMChallengeModel210odel" +] diff --git a/subvortex/validator/core/model/challenge/challenge.py b/subvortex/validator/core/model/challenge/challenge.py new file mode 100644 index 00000000..0b214581 --- /dev/null +++ b/subvortex/validator/core/model/challenge/challenge.py @@ -0,0 +1,65 @@ +from typing import Any +from dataclasses import dataclass, fields + + +@dataclass +class Challenge: + scheduld_id: str + params: Any + block_hash: str + value: Any + process_time: float + + def __init__( + self, + scheduld_id: str, + params: Any, + block_hash: str, + value: Any, + process_time: float = 0.0, + ): + self.scheduld_id = scheduld_id + self.params = params + self.block_hash = block_hash + self.value = value + self.process_time = process_time + + @property + def id(self): + return self.scheduld_id + + @staticmethod + def create( + schedule_id: str, + process_time: float, + challenge: tuple | None, + ): + params, block_hash, value = challenge or (None, None, None) + return Challenge( + scheduld_id=schedule_id, + params=params, + block_hash=block_hash, + value=value, + process_time=process_time, + ) + + @staticmethod + def from_dict(mapping): + """Converts a dictionary to a Challenge instance.""" + return Challenge( + scheduld_id=mapping["scheduld_id"], + params=mapping["params"], + block_hash=mapping["block_hash"], + value=mapping["value"], + process_time=float(mapping["process_time"]), + ) + + def to_dict(self): + """Converts a Challenge instance to a dictionary""" + return { + "scheduld_id": self.scheduld_id, + "params": self.params or "", + "block_hash": self.block_hash or "", + "value": self.value or "", + "process_time": self.process_time, + } diff --git a/subvortex/validator/core/model/challenge/model_challenge_210.py b/subvortex/validator/core/model/challenge/model_challenge_210.py new file mode 100644 index 00000000..51429dfd --- /dev/null +++ b/subvortex/validator/core/model/challenge/model_challenge_210.py @@ -0,0 +1,63 @@ +from redis import asyncio as Redis + +from subvortex.validator.core.model.challenge import Challenge +from subvortex.core.database.database_utils import decode_hash + + +class ChallengeModel: + """ + Versioned model for storing and retrieving hotkey challenge from Redis. + This is version 2.1.0 of the model. + """ + + version = "2.1.0" + + def _key(self, hotkey: str) -> str: + return f"sv:challenge:{hotkey}" + + async def read(self, redis: Redis, hotkey: str) -> Challenge | None: + key = self._key(hotkey) + raw = await redis.hgetall(key) + if not raw: + return None + data = decode_hash(raw) + return Challenge.from_dict(data) + + async def read_all(self, redis: Redis) -> dict[str, Challenge]: + miners: dict[str, Challenge] = {} + + async for key in redis.scan_iter(match=self._key("*")): + decoded_key = key.decode() + raw = await redis.hgetall(decoded_key) + if not raw: + continue + + ss58_address = decoded_key.split("sv:challenge:")[1] + data = decode_hash(raw) + miners[ss58_address] = Challenge.from_dict(data) + + return miners + + async def write(self, redis: Redis, hotkey: str, challenge: Challenge): + key = self._key(hotkey) + data = Challenge.to_dict(challenge) + await redis.hmset(key, data) + + async def write_all(self, redis: Redis, schedules: list[Challenge]): + async with redis.pipeline() as pipe: + for challenge in schedules: + key = self._key(challenge.hotkey) + data = Challenge.to_dict(challenge) + pipe.hmset(key, data) + await pipe.execute() + + async def delete(self, redis: Redis, hotkey: str): + key = self._key(hotkey) + await redis.delete(key) + + async def delete_all(self, redis: Redis, schedules: list[Challenge]): + async with redis.pipeline() as pipe: + for challenge in schedules: + key = self._key(challenge.hotkey) + pipe.delete(key) + await pipe.execute() diff --git a/subvortex/validator/neuron/src/models/miner/__init__.py b/subvortex/validator/core/model/miner/__init__.py similarity index 100% rename from subvortex/validator/neuron/src/models/miner/__init__.py rename to subvortex/validator/core/model/miner/__init__.py diff --git a/subvortex/validator/neuron/src/models/miner/miner.py b/subvortex/validator/core/model/miner/miner.py similarity index 100% rename from subvortex/validator/neuron/src/models/miner/miner.py rename to subvortex/validator/core/model/miner/miner.py diff --git a/subvortex/validator/neuron/src/models/miner/model_miner_210.py b/subvortex/validator/core/model/miner/model_miner_210.py similarity index 97% rename from subvortex/validator/neuron/src/models/miner/model_miner_210.py rename to subvortex/validator/core/model/miner/model_miner_210.py index ac770725..8052c913 100644 --- a/subvortex/validator/neuron/src/models/miner/model_miner_210.py +++ b/subvortex/validator/core/model/miner/model_miner_210.py @@ -1,7 +1,7 @@ from redis import asyncio as Redis from subvortex.core.database.database_utils import decode_hash -from subvortex.validator.neuron.src.models.miner import Miner +from subvortex.validator.core.model.miner import Miner class MinerModel: diff --git a/subvortex/validator/core/model/schedule/__init__.py b/subvortex/validator/core/model/schedule/__init__.py new file mode 100644 index 00000000..0f294d95 --- /dev/null +++ b/subvortex/validator/core/model/schedule/__init__.py @@ -0,0 +1,7 @@ +from .schedule import Schedule +from .model_schedule_210 import ScheduleModel as ScheduleModel210 + +__all__ = [ + "Schedule", + "ScheduleModel210" +] diff --git a/subvortex/validator/core/model/schedule/model_schedule_210.py b/subvortex/validator/core/model/schedule/model_schedule_210.py new file mode 100644 index 00000000..5001e7c0 --- /dev/null +++ b/subvortex/validator/core/model/schedule/model_schedule_210.py @@ -0,0 +1,63 @@ +from redis import asyncio as Redis + +from subvortex.validator.core.model.schedule import Schedule +from subvortex.core.database.database_utils import decode_hash + + +class ScheduleModel: + """ + Versioned model for storing and retrieving hotkey schedule from Redis. + This is version 2.1.0 of the model. + """ + + version = "2.1.0" + + def _key(self, hotkey: str) -> str: + return f"sv:schedule:{hotkey}" + + async def read(self, redis: Redis, hotkey: str) -> Schedule | None: + key = self._key(hotkey) + raw = await redis.hgetall(key) + if not raw: + return None + data = decode_hash(raw) + return Schedule.from_dict(data) + + async def read_all(self, redis: Redis) -> dict[str, Schedule]: + miners: dict[str, Schedule] = {} + + async for key in redis.scan_iter(match=self._key("*")): + decoded_key = key.decode() + raw = await redis.hgetall(decoded_key) + if not raw: + continue + + ss58_address = decoded_key.split("sv:schedule:")[1] + data = decode_hash(raw) + miners[ss58_address] = Schedule.from_dict(data) + + return miners + + async def write(self, redis: Redis, hotkey: str, schedule: Schedule): + key = self._key(hotkey) + data = Schedule.to_dict(schedule) + await redis.hmset(key, data) + + async def write_all(self, redis: Redis, schedules: list[Schedule]): + async with redis.pipeline() as pipe: + for schedule in schedules: + key = self._key(schedule.hotkey) + data = Schedule.to_dict(schedule) + pipe.hmset(key, data) + await pipe.execute() + + async def delete(self, redis: Redis, hotkey: str): + key = self._key(hotkey) + await redis.delete(key) + + async def delete_all(self, redis: Redis, schedules: list[Schedule]): + async with redis.pipeline() as pipe: + for schedule in schedules: + key = self._key(schedule.hotkey) + pipe.delete(key) + await pipe.execute() diff --git a/subvortex/validator/core/model/schedule/schedule.py b/subvortex/validator/core/model/schedule/schedule.py new file mode 100644 index 00000000..7956e7ba --- /dev/null +++ b/subvortex/validator/core/model/schedule/schedule.py @@ -0,0 +1,99 @@ +from typing import Optional +from dataclasses import dataclass, fields + + +@dataclass +class Schedule: + index: int + instance: int + cycle_start: int + cycle_end: int + block_start: int + block_end: int + country: str + + def __init__( + self, + index: int, + instance: int, + cycle_start: int, + cycle_end: int, + block_start: int, + block_end: int, + country: str, + ): + self.index = index + self.instance = instance + self.cycle_start = cycle_start + self.cycle_end = cycle_end + self.block_start = block_start + self.block_end = block_end + self.country = country + + @property + def id(self): + return ( + f"{self.cycle_start}-{self.cycle_end}-{self.block_start}-{self.block_end}" + ) + + @property + def step_index(self): + return self.index + 1 + + @property + def instance_index(self): + return self.instance + 1 + + @staticmethod + def create( + index: int, + instance: int, + cycle_start: int, + cycle_end: int, + block_start: int, + block_end: int, + country: str, + ): + return Schedule( + index=index, + instance=instance, + cycle_start=cycle_start, + cycle_end=cycle_end, + block_start=block_start, + block_end=block_end, + country=country, + ) + + @staticmethod + def from_dict(mapping): + """Converts a dictionary to a Schedule instance.""" + return Schedule( + index=int(mapping["index"]), + instance=int(mapping["instance"] or 1), + cycle_start=int(mapping["cycle_start"]), + cycle_end=int(mapping["cycle_end"]), + block_start=int(mapping["block_start"]), + block_end=int(mapping["block_end"]), + country=mapping["country"], + ) + + def to_dict(self): + """Converts a Schedule instance to a dictionary""" + return { + "index": self.index, + "instance": self.instance, + "cycle_start": self.cycle_start, + "cycle_end": self.cycle_end, + "block_start": self.block_start, + "block_end": self.block_end, + "country": self.country, + } + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Schedule): + return NotImplemented + + for f in fields(self): + if getattr(self, f.name) != getattr(other, f.name): + return False + return True diff --git a/subvortex/validator/core/model/score/score.py b/subvortex/validator/core/model/score/score.py new file mode 100644 index 00000000..fe2df084 --- /dev/null +++ b/subvortex/validator/core/model/score/score.py @@ -0,0 +1,178 @@ +from typing import List +from dataclasses import dataclass, fields + +@dataclass +class Score: + uid: int + hotkey: str + schedule_id: str + success: bool + reason: str | None + + # Metadata + availability_attempts: List[int] + availability_successes: List[int] + reliability_attempts: List[int] + reliability_successes: List[int] + latency_times: List[float] + performance_attempts: List[int] + performance_successes: List[int] + performance_boost: List[float] + + # Scores + availability_score: float + reliability_score: float + latency_score: float + performance_score: float + distribution_score: float + final_score: float + moving_score: float + + def __init__( + self, + uid: int, + hotkey: str, + schedule_id: str = "", + availability_attempts: List[int] = [], + availability_successes: List[int] = [], + reliability_attempts: List[int] = [], + reliability_successes: List[int] = [], + latency_times: List[float] = [], + performance_attempts: List[int] = [], + performance_successes: List[int] = [], + performance_boost: List[float] = [], + availability_score: float = 0.0, + reliability_score: float = 0.0, + latency_score: float = 0.0, + distribution_score: float = 0.0, + performance_score: float = 0.0, + final_score: float = 0.0, + moving_score: float = 0.0, + success: bool = False, + reason: str | None = None, + ): + self.uid = uid + self.hotkey = hotkey + self.schedule_id = schedule_id + self.availability_attempts = availability_attempts + self.availability_successes = availability_successes + self.reliability_attempts = reliability_attempts + self.reliability_successes = reliability_successes + self.latency_times = latency_times + self.performance_attempts = performance_attempts + self.performance_successes = performance_successes + self.performance_boost = performance_boost + self.availability_score = availability_score + self.reliability_score = reliability_score + self.latency_score = latency_score + self.distribution_score = distribution_score + self.performance_score = performance_score + self.final_score = final_score + self.moving_score = moving_score + self.success = success + self.reason = reason + + @staticmethod + def convert_to(mapping): + """Converts a dictionary to a Score instance.""" + return Score( + uid=int(mapping["uid"]), + hotkey=mapping["hotkey"], + schedule_id=mapping["schedule_id"], + availability_attempts=( + list(map(int, mapping.get("availability_attempts", "").split(","))) + if mapping.get("availability_attempts") + else [] + ), + availability_successes=( + list(map(int, mapping.get("availability_successes", "").split(","))) + if mapping.get("availability_successes") + else [] + ), + reliability_attempts=( + list(map(int, mapping.get("reliability_attempts", "").split(","))) + if mapping.get("reliability_attempts") + else [] + ), + reliability_successes=( + list(map(int, mapping.get("reliability_successes", "").split(","))) + if mapping.get("reliability_successes") + else [] + ), + latency_times=( + list(map(float, mapping.get("latency_times", "").split(","))) + if mapping.get("latency_times") + else [] + ), + performance_attempts=( + list(map(int, mapping.get("performance_attempts", "").split(","))) + if mapping.get("performance_attempts") + else [] + ), + performance_successes=( + list(map(int, mapping.get("performance_successes", "").split(","))) + if mapping.get("performance_successes") + else [] + ), + performance_boost=( + list(map(float, mapping.get("performance_boost", "").split(","))) + if mapping.get("performance_boost") + else [] + ), + availability_score=float(mapping.get("availability_score", 0.0)), + reliability_score=float(mapping.get("reliability_score", 0.0)), + latency_score=float(mapping.get("latency_score", 0.0)), + distribution_score=float(mapping.get("distribution_score", 0.0)), + performance_score=float(mapping.get("performance_score", 0.0)), + final_score=float(mapping.get("final_score", 0.0)), + moving_score=float(mapping.get("moving_score", 0.0)), + success=bool(mapping.get("success")), + reason=mapping.get("reason"), + ) + + def convert_from(self): + """Converts a Score instance to a dictionary""" + return { + "uid": self.uid, + "hotkey": self.hotkey, + "schedule_id": self.schedule_id, + "availability_attempts": ",".join(map(str, self.availability_attempts)), + "availability_successes": ",".join(map(str, self.availability_successes)), + "reliability_attempts": ",".join(map(str, self.reliability_attempts)), + "reliability_successes": ",".join(map(str, self.reliability_successes)), + "latency_times": ",".join(map(str, self.latency_times)), + "performance_attempts": ",".join(map(str, self.performance_attempts)), + "performance_successes": ",".join(map(str, self.performance_successes)), + "performance_boost": ",".join(map(str, self.performance_boost)), + "availability_score": self.availability_score, + "reliability_score": self.reliability_score, + "latency_score": self.latency_score, + "distribution_score": self.distribution_score, + "performance_score": self.performance_score, + "final_score": self.final_score, + "moving_score": self.moving_score, + "success": int(self.success), + "reason": self.reason or "", + } + + def __str__(self): + """Returns a human-readable string representation of the Score instance.""" + return ( + f"Score(schedule_id={self.schedule_id}, success={self.success}, reason={self.reason}, availability_attempts={self.availability_attempts}, availability_successes={self.availability_successes}, reliability_attempts={self.reliability_attempts}, " + f"reliability_successes={self.reliability_successes}, latency_times={self.latency_times}, " + f"performance_attempts={self.performance_attempts}, performance_successes={self.performance_successes}, performance_boost={self.performance_boost}," + f"availability_score={self.availability_score}, reliability_score={self.reliability_score}, " + f"latency_score={self.latency_score}, distribution_score={self.distribution_score}, " + f"performance_score={self.performance_score}, final_score={self.final_score}, moving_score={self.moving_score})" + ) + + def __repr__(self): + """Returns a more detailed string representation useful for debugging.""" + return ( + f"Score(schedule_id={self.schedule_id}, success={self.success}, reason={self.reason}, availability_attempts={self.availability_attempts}, availability_successes={self.availability_successes}, reliability_attempts={self.reliability_attempts}, " + f"reliability_successes={self.reliability_successes!r}, latency_times={self.latency_times}, " + f"performance_attempts={self.performance_attempts!r}, performance_successes={self.performance_successes!r}, performance_boost={self.performance_boost!r}, " + f"availability_score={self.availability_score!r}, reliability_score={self.reliability_score!r}, " + f"latency_score={self.latency_score!r}, distribution_score={self.distribution_score!r}, " + f"performance_score={self.performance_score!r}, final_score={self.final_score!r}, moving_score={self.moving_score!r})" + ) diff --git a/subvortex/validator/core/tests/__init__.py b/subvortex/validator/core/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/subvortex/validator/core/tests/conftest.py b/subvortex/validator/core/tests/conftest.py new file mode 100644 index 00000000..b07890fe --- /dev/null +++ b/subvortex/validator/core/tests/conftest.py @@ -0,0 +1,47 @@ +# The MIT License (MIT) +# Copyright © 2024 Eclipse Vortex + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. +import pytest +from hashlib import sha256 +from typing import Optional + +import subvortex.core.core_bittensor.wallet as scbtw +import subvortex.core.shared.mock as scsm + + +def get_block_hash(block: Optional[int] = None) -> str: + return "0x" + sha256(str(block).encode()).hexdigest()[:64] + + +def make_async(method): + """Wraps a mock's return value in an async function.""" + + async def async_wrapper(*args, **kwargs): + return method(*args, **kwargs) + + return async_wrapper + + +@pytest.fixture(scope="session", autouse=False) +def subtensor(): + wallet = scbtw.get_mock_wallet() + subtensor = scsm.MockSubtensor(netuid=1, wallet=wallet) + + # Make some sync method async + subtensor.get_block_hash = make_async(subtensor.get_block_hash) + subtensor.substrate.get_block_hash = make_async(get_block_hash) + + yield subtensor diff --git a/subvortex/validator/core/tests/mock/neuron.py b/subvortex/validator/core/tests/mock/neuron.py new file mode 100644 index 00000000..50f6face --- /dev/null +++ b/subvortex/validator/core/tests/mock/neuron.py @@ -0,0 +1,57 @@ +import subvortex.core.model.neuron.neuron as scmnn + + +def create_validator(*args, **kargs): + uid = 1 + params = { **kargs } + return scmnn.Neuron( + uid=params.get("uid") or uid, + hotkey=params.get("hotkey") or f"hotkey_{uid}", + coldkey=params.get("coldkey") or f"coldkey_{uid}", + netuid=params.get("netuid") or 1, + active=params.get("active") or True, + stake=params.get("stake") or 10.0 * uid, + total_stake=params.get("totl_stake") or 20.0 * uid, + rank=params.get("rank") or 0.9, + emission=params.get("icentive") or 0.5, + incentive=params.get("icentive") or 0, + consensus=params.get("cnsensus") or 0.95, + trust=params.get("trust") or 0, + validator_trust=params.get("validatr_trust") or 0.99, + dividends=params.get("dvidends") or 0, + last_update=params.get("las_update") or 123456789, + validator_permit=params.get("validato_permit") or True, + ip=params.get("ip") or f"0.0.0.0", + port=params.get("port") or 8080 + uid, + version=params.get("version") or "1.0.0", + is_serving=params.get("isserving") or True, + country=params.get("country") or "US", + ) + + +def create_miner(*args, **kargs): + uid = 1 + params = { **kargs } + return scmnn.Neuron( + uid=params.get("uid") or uid, + hotkey=params.get("hotkey") or f"hotkey_{uid}", + coldkey=params.get("coldkey") or f"coldkey_{uid}", + netuid=params.get("netuid") or 1, + active=params.get("active") or True, + stake=params.get("stake") or 0, + total_stake=params.get("totl_stake") or 0, + rank=params.get("rank") or 0.9, + emission=params.get("icentive") or 0.5, + incentive=params.get("icentive") or 0.8, + consensus=params.get("cnsensus") or 0.95, + trust=params.get("trust") or 0.99, + validator_trust=params.get("validatr_trust") or 0, + dividends=params.get("dvidends") or 0, + last_update=params.get("las_update") or 123456789, + validator_permit=params.get("validato_permit") or True, + ip=params.get("ip") or f"192.168.1.{uid}", + port=params.get("port") or 8080 + uid, + version=params.get("version") or "1.0.0", + is_serving=params.get("isserving") or True, + country=params.get("country") or "US", + ) diff --git a/subvortex/validator/core/tests/src/test_scheduler.py b/subvortex/validator/core/tests/src/test_scheduler.py new file mode 100644 index 00000000..cd7d7234 --- /dev/null +++ b/subvortex/validator/core/tests/src/test_scheduler.py @@ -0,0 +1,196 @@ +import pytest + +import subvortex.validator.core.challenger.settings as svcce +import subvortex.validator.core.challenger.scheduler as svccs + +from subvortex.validator.core.tests.mock.neuron import create_validator + + +@pytest.mark.asyncio +async def test_get_schedules_when_the_number_of_validators_is_less_than_the_number_of_country_should_compute_the_right_schedule( + subtensor, +): + # Arrange + settings = svcce.Settings() + + countries = [ + ("US", 3), + ("JP", 26), + ("FR", 20), + ("GB", 1), + ("MX", 10), + ("CA", 5), + ("IT", 17), + ("SP", 2), + ("HK", 15), + ("TW", 1), + ] + + validators = [ + create_validator( + uid=i + 1, + country="US", + hotkey="5F3sa2TJXJzYqSRyyoPybRhFZ9iSQ8mbHTZ4s5bSjG5z7V" + countries[i][0], + total_stake=10 * (i + 1), + ) + for i in range(5) + ] + + cycle = svccs.get_next_cycle( + settings=settings, netuid=92, block=4095736, countries=countries + ) + + # Act + schedules = await svccs.get_schedules( + substrate=subtensor.substrate, + settings=settings, + cycle=cycle, + validators=validators, + countries=countries, + ) + + # Assert + # Check all validator have a schedule + assert len(validators) == len(schedules) + + for schedule in schedules.values(): + # Check each validator contains a step for each country + scheduled_countries = [x.country for x in schedule] + assert len(countries) == len(scheduled_countries) + assert len(countries) == len(set(scheduled_countries)) + + # Check step by step + for i in range(len(schedule)): + # # Check no collision in terms of country across all validators + # step_countries = [x[i].country for x in schedules.values()] + # assert len(countries) == len(set(step_countries)) + + # Check the cycle + cycle_start = [x[i].cycle_start for x in schedules.values()] + assert 1 == len(set(cycle_start)) + assert cycle.start == cycle_start[0] + + cycle_end = [x[i].cycle_end for x in schedules.values()] + assert 1 == len(set(cycle_end)) + assert cycle.stop == cycle_end[0] + + step_start = cycle.start if i == 0 else schedule[i - 1].block_end + block_start = [x[i].block_start for x in schedules.values()] + assert 1 == len(set(block_start)) + assert step_start == block_start[0] + + step_end = ( + cycle.stop if i >= len(schedule) - 1 else schedule[i + 1].block_start + ) + block_end = [x[i].block_end for x in schedules.values()] + assert 1 == len(set(block_end)) + assert step_end == block_end[0] + + # Check validator by validator + for i, (step) in enumerate(schedule): + # # Check the cycle + # assert cycle.start == step.cycle_start + # assert cycle.stop == step.cycle_end + + # Check the step + step_start = cycle.start if i == 0 else schedule[i - 1].block_end + assert step_start == step.block_start + + step_end = ( + schedule[i + 1].block_start if i < len(schedule) - 1 else cycle.stop + ) + assert step_end == step.block_end + + # Check for collision + for i in range(len(countries)): + scheduled_country = [x[i].country for x in schedules.values()] + assert len(validators) == len(scheduled_country) + assert len(countries) != len(scheduled_country) + + +@pytest.mark.asyncio +async def test_get_schedules_when_the_number_of_validators_is_greater_than_the_number_of_country_should_compute_the_right_schedule( + subtensor, +): + # Arrange + settings = svcce.Settings() + + countries = [("US", 3), ("JP", 26), ("FR", 20), ("GB", 1), ("MX", 10)] + + validators = [ + create_validator( + uid=i + 1, + country="US", + hotkey=f"5F3sa2TJXJzYqSRyyoPybRhFZ9iSQ8mbHTZ4s5bSjG5z7V{i}", + ) + for i in range(10) + ] + + cycle = svccs.get_next_cycle( + settings=settings, netuid=92, block=4095736, countries=countries + ) + + # Act + schedules = await svccs.get_schedules( + substrate=subtensor.substrate, + settings=settings, + cycle=cycle, + validators=validators, + countries=countries, + ) + + # Assert + # Check not all the validators have a schedule + assert len(validators) > len(schedules) + + for schedule in schedules.values(): + # Check each validator contains a step for each country + scheduled_countries = [x.country for x in schedule] + assert len(countries) == len(scheduled_countries) + assert len(countries) == len(set(scheduled_countries)) + + # Check step by step + for i in range(len(schedule)): + # Check no collision in terms of country across all validators + step_countries = [x[i].country for x in schedules.values()] + assert len(countries) == len(step_countries) + + # Check the cycle + cycle_start = [x[i].cycle_start for x in schedules.values()] + assert 1 == len(set(cycle_start)) + assert cycle.start == cycle_start[0] + + cycle_end = [x[i].cycle_end for x in schedules.values()] + assert 1 == len(set(cycle_end)) + assert cycle.stop == cycle_end[0] + + step_start = cycle.start if i == 0 else schedule[i - 1].block_end + block_start = [x[i].block_start for x in schedules.values()] + assert 1 == len(set(block_start)) + assert step_start == block_start[0] + + step_end = ( + cycle.stop if i >= len(schedule) - 1 else schedule[i + 1].block_start + ) + block_end = [x[i].block_end for x in schedules.values()] + assert 1 == len(set(block_end)) + assert step_end == block_end[0] + + for i, (step) in enumerate(schedule): + # Check the cycle + assert cycle.start == step.cycle_start + assert cycle.stop == step.cycle_end + + # Check the step + step_start = cycle.start if i == 0 else schedule[i - 1].block_end + assert step_start == step.block_start + + step_end = ( + schedule[i + 1].block_start if i < len(schedule) - 1 else cycle.stop + ) + assert step_end == step.block_end + + # Check for collision + for i in range(len(countries)): + scheduled_country = [x[i].country for x in schedules.values()] + assert len(countries) == len(scheduled_country)