Skip to content

Commit 7d779ad

Browse files
committed
investigate async issue
1 parent eb3d5bb commit 7d779ad

11 files changed

Lines changed: 330 additions & 182 deletions

File tree

subvortex/core/core_bittensor/subtensor/subtensor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ async def watchdog():
294294
f"No block received in the last {timeout} seconds"
295295
)
296296

297+
btul.logging.trace("🚀 Starting block subscription and watchdog.", prefix="ReliableSubtensor")
298+
297299
# Run handler + watchdog concurrently
298300
await asyncio.gather(
299301
subtensor.substrate._get_block_handler(
@@ -313,10 +315,12 @@ async def watchdog():
313315
prefix="ReliableSubtensor",
314316
)
315317

318+
btul.logging.info("🔁 Reinstantiating substrate connection...", prefix="ReliableSubtensor")
316319
await subtensor.substrate._reinstantiate_substrate()
317320

318321
if attempt > 0:
319322
# Wait avg time of a block
323+
btul.logging.debug("⏲️ Waiting before retrying subscription...", prefix="ReliableSubtensor")
320324
await asyncio.sleep(12)
321325

322326
attempt += 1

subvortex/core/database/database.py

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,123 @@
1+
import asyncio
12
from redis import asyncio as aioredis
23
from packaging.version import parse as parse_version
4+
from weakref import WeakKeyDictionary
35

46
import bittensor.utils.btlogging as btul
5-
67
from subvortex.core.database.database_utils import decode_value
78

89

910
class Database:
1011
def __init__(self, settings):
1112
self.models = {}
1213
self.settings = settings
13-
self.database = None
14+
self._clients = WeakKeyDictionary() # Cache clients per event loop
1415

15-
async def connect(self):
16-
self.database = aioredis.StrictRedis(
16+
def _new_client(self):
17+
return aioredis.StrictRedis(
1718
host=self.settings.database_host,
1819
port=self.settings.database_port,
1920
db=self.settings.database_index,
2021
password=self.settings.database_password,
2122
)
2223

23-
btul.logging.info("Connected to Redis", prefix=self.settings.logging_name)
24+
def _get_loop(self):
25+
return asyncio.get_running_loop()
26+
27+
async def get_client(self):
28+
loop = self._get_loop()
29+
30+
if loop in self._clients:
31+
return self._clients[loop]
32+
33+
client = self._new_client()
34+
self._clients[loop] = client
35+
36+
btul.logging.info(
37+
"Created new Redis client for event loop", prefix=self.settings.logging_name
38+
)
39+
return client
2440

2541
async def is_connection_alive(self) -> bool:
42+
client = await self.get_client()
43+
2644
try:
27-
pong = await self.database.ping()
45+
pong = await client.ping()
2846
return pong is True
2947
except Exception as e:
30-
btul.logging.warning(f"Redis connection check failed: {e}")
48+
btul.logging.warning(
49+
f"Redis connection check failed: {e}", prefix=self.settings.logging_name
50+
)
3151
return False
3252

3353
async def ensure_connection(self):
34-
if self.database is None or not await self.is_connection_alive():
54+
client = await self.get_client()
55+
56+
if not await self.is_connection_alive():
3557
btul.logging.warning(
36-
"Reconnecting to Redis...",
58+
"Redis ping failed, but client will be reused",
3759
prefix=self.settings.logging_name,
3860
)
39-
await self.connect()
61+
# You may optionally recreate here if needed
4062

4163
async def wait_until_ready(self, name: str):
42-
# Ensure the connection is ip and running
4364
await self.ensure_connection()
4465

66+
client = await self.get_client()
67+
4568
message_key = self._key(f"state:{name}")
4669
stream_key = self._key(f"state:{name}:stream")
4770
last_id = "$"
4871

4972
try:
50-
# Step 1: check the message key first
51-
snapshot = await self.database.get(message_key)
73+
snapshot = await client.get(message_key)
5274
if snapshot and snapshot.decode() == "ready":
53-
btul.logging.trace(
75+
btul.logging.debug(
5476
f"{name} is already ready (via message key)",
5577
prefix=self.settings.logging_name,
5678
)
5779
return
5880

59-
# Step 2: wait for stream messages
60-
btul.logging.trace(
81+
btul.logging.debug(
6182
f"Waiting on stream: {stream_key}", prefix=self.settings.logging_name
6283
)
6384
while True:
64-
entries = await self.database.xread({stream_key: last_id}, block=0)
85+
entries = await client.xread({stream_key: last_id}, block=0)
6586
if not entries:
6687
continue
6788

6889
for stream_key, messages in entries:
69-
btul.logging.trace(
90+
btul.logging.debug(
7091
f"Received stream message: {messages}",
7192
prefix=self.settings.logging_name,
7293
)
7394
for msg_id, fields in messages:
74-
state = fields.get("state".encode(), b"").decode()
95+
state = fields.get(b"state", b"").decode()
7596
if state == "ready":
76-
btul.logging.trace(
97+
btul.logging.debug(
7798
f"{name} is now ready (via stream)",
7899
prefix=self.settings.logging_name,
79100
)
80101
return
81-
last_id = msg_id # move forward
102+
last_id = msg_id
82103
except Exception as err:
83104
btul.logging.warning(
84105
f"Failed to read the state of {name}: {err}",
85106
prefix=self.settings.logging_name,
86107
)
87108

88109
async def _get_migration_status(self, model_name: str):
89-
"""
90-
Returns:
91-
- latest_version: the 'new' version
92-
- active_versions: versions marked 'dual' or 'new',
93-
or fallback to latest if none are active.
94-
"""
95-
# Ensure the connection is ip and running
96110
await self.ensure_connection()
111+
112+
client = await self.get_client()
97113

98114
latest = None
99115
active = []
100116

101117
all_versions = sorted(self.models[model_name].keys(), key=parse_version)
102118

103119
for version in all_versions:
104-
mode = await self.database.get(f"migration_mode:{version}")
120+
mode = await client.get(f"migration_mode:{version}")
105121
mode = decode_value(mode)
106122

107123
if mode == "new":

subvortex/core/file/file_monitor.py

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
# DEALINGS IN THE SOFTWARE.
1717
import asyncio
1818
import threading
19-
import bittensor.utils.btlogging as btul
2019
from enum import Enum
2120

21+
import bittensor.utils.btlogging as btul
2222
from subvortex.core.file.file_provider import FileProvider
2323

24-
2524
LOGGER_NAME = "File Monitoring"
2625

2726

@@ -32,21 +31,25 @@ class FileType(Enum):
3231

3332
class FileMonitor(threading.Thread):
3433
def __init__(self):
35-
super().__init__()
34+
super().__init__(daemon=True)
3635
self.stop_flag = threading.Event()
3736
self.last_error_shown = None
3837
self.coroutines = []
39-
self.loop = asyncio.new_event_loop()
38+
self.loop = None
4039

4140
def add_file_provider(self, file_provider: FileProvider):
42-
task = self.loop.create_task(self._check_file(file_provider))
43-
self.coroutines.append(task)
41+
if self.loop:
42+
task = asyncio.run_coroutine_threadsafe(
43+
self._check_file(file_provider), self.loop
44+
)
45+
self.coroutines.append(task)
4446

4547
async def _check_file(self, file: FileProvider):
46-
while not self.stop_flag.is_set():
47-
try:
48-
# Wait a specific time before starting
48+
try:
49+
while not self.stop_flag.is_set():
4950
await asyncio.sleep(file.check_interval)
51+
if self.stop_flag.is_set():
52+
break
5053

5154
btul.logging.debug(
5255
f"[{LOGGER_NAME}][{file.logger_name}] Checking file..."
@@ -70,45 +73,38 @@ async def _check_file(self, file: FileProvider):
7073

7174
# Reset the last error shown
7275
self.last_error_shown = None
73-
except Exception as err:
74-
error_message = f"[{LOGGER_NAME}][{file.logger_name}] Failed processing file: {err} {type(err)}"
75-
if error_message != self.last_error_shown:
76-
btul.logging.error(error_message)
77-
self.last_error_shown = error_message
78-
79-
async def _run_async(self):
80-
while not self.stop_flag.is_set():
81-
try:
82-
# Sleep for a second before gathering tasks
83-
await asyncio.sleep(1)
8476

85-
if self.stop_flag.is_set():
86-
# Time to stop the file monitoring
87-
# We wait until all the tasks are finished
88-
await asyncio.gather(*self.coroutines)
89-
except Exception as err:
90-
error_message = (
91-
f"[{LOGGER_NAME}] Failed checking files: {err} {type(err)}"
92-
)
93-
if error_message != self.last_error_shown:
94-
btul.logging.error(error_message)
95-
self.last_error_shown = error_message
77+
except Exception as err:
78+
error_message = f"[{LOGGER_NAME}][{file.logger_name}] Failed processing file: {err} {type(err)}"
79+
if error_message != self.last_error_shown:
80+
btul.logging.error(error_message)
81+
self.last_error_shown = error_message
9682

9783
def run(self):
84+
self.loop = asyncio.new_event_loop()
85+
asyncio.set_event_loop(self.loop)
86+
87+
async def monitor():
88+
await asyncio.Event().wait() # Wait forever unless externally cancelled
89+
9890
try:
99-
self.loop.run_until_complete(self._run_async())
91+
self.loop.run_until_complete(monitor())
92+
except Exception as err:
93+
btul.logging.error(f"[{LOGGER_NAME}] Loop error: {err}")
10094
finally:
101-
self.loop.stop()
102-
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
95+
pending = asyncio.all_tasks(loop=self.loop)
96+
for task in pending:
97+
task.cancel()
98+
self.loop.run_until_complete(
99+
asyncio.gather(*pending, return_exceptions=True)
100+
)
103101
self.loop.close()
104-
105-
btul.logging.debug(f"[{LOGGER_NAME}] run ended")
106-
107-
def start(self):
108-
super().start()
109-
btul.logging.debug(f"[{LOGGER_NAME}] started")
102+
btul.logging.debug(f"[{LOGGER_NAME}] Event loop closed")
110103

111104
def stop(self):
105+
btul.logging.info(f"[{LOGGER_NAME}] FileMonitor stopping")
112106
self.stop_flag.set()
113-
super().join()
114-
btul.logging.debug(f"[{LOGGER_NAME}] stopped")
107+
if self.loop:
108+
self.loop.call_soon_threadsafe(self.loop.stop)
109+
self.join()
110+
btul.logging.info(f"[{LOGGER_NAME}] FileMonitor stopped")

0 commit comments

Comments
 (0)