File tree Expand file tree Collapse file tree 4 files changed +43
-8
lines changed
Expand file tree Collapse file tree 4 files changed +43
-8
lines changed Original file line number Diff line number Diff line change @@ -50,14 +50,14 @@ async def wait_until_ready(self, name: str):
5050 # Step 1: check the message key first
5151 snapshot = await self .database .get (message_key )
5252 if snapshot and snapshot .decode () == "ready" :
53- btul .logging .info (
53+ btul .logging .trace (
5454 f"{ name } is already ready (via message key)" ,
5555 prefix = self .settings .logging_name ,
5656 )
5757 return
5858
5959 # Step 2: wait for stream messages
60- btul .logging .debug (
60+ btul .logging .trace (
6161 f"Waiting on stream: { stream_key } " , prefix = self .settings .logging_name
6262 )
6363 while True :
@@ -66,14 +66,14 @@ async def wait_until_ready(self, name: str):
6666 continue
6767
6868 for stream_key , messages in entries :
69- btul .logging .debug (
69+ btul .logging .trace (
7070 f"Received stream message: { messages } " ,
7171 prefix = self .settings .logging_name ,
7272 )
7373 for msg_id , fields in messages :
7474 state = fields .get ("state" .encode (), b"" ).decode ()
7575 if state == "ready" :
76- btul .logging .info (
76+ btul .logging .trace (
7777 f"{ name } is now ready (via stream)" ,
7878 prefix = self .settings .logging_name ,
7979 )
Original file line number Diff line number Diff line change @@ -135,6 +135,12 @@ async def start(self):
135135 )
136136
137137 finally :
138+ # Flag metagraph as unready and notify
139+ if not self .settings .dry_run :
140+ await self .database .mark_as_unready ()
141+ await self .database .notify_state ()
142+
143+ # Signal run completed
138144 self .run_complete .set ()
139145 btul .logging .info (
140146 "🛑 MetagraphObserver service exiting..." ,
@@ -345,6 +351,21 @@ async def _notify_if_needed(self, ready):
345351
346352 return True
347353
354+ async def _notify_if_needed (self , ready ):
355+ if ready :
356+ return ready
357+
358+ btul .logging .debug (
359+ "🔔 Metagraph marked ready" , prefix = self .settings .logging_name
360+ )
361+ not self .settings .dry_run and await self .database .mark_as_ready ()
362+ btul .logging .debug (
363+ "📣 Broadcasting metagraph ready state" , prefix = self .settings .logging_name
364+ )
365+ not self .settings .dry_run and await self .database .notify_state ()
366+
367+ return True
368+
348369 async def _has_new_neuron_registered (self , registration_count ) -> tuple [bool , int ]:
349370 new_count = await scbs .get_number_of_registration (
350371 subtensor = self .subtensor , netuid = self .settings .netuid
Original file line number Diff line number Diff line change @@ -181,10 +181,10 @@ async def _initialize(self):
181181 await self .subtensor .initialize ()
182182
183183 # Initialize database
184- btul .logging .info ("Waiting for database readiness..." )
184+ btul .logging .info ("Waiting for metagraph readiness..." )
185185 self .database = Database (settings = self .settings )
186186 await self .database .wait_until_ready ("metagraph" )
187- btul .logging .info ("Database is ready." )
187+ btul .logging .info ("Metagraph is ready." )
188188
189189 # Get the list of neurons
190190 self .neurons = await self .database .get_neurons ()
@@ -263,6 +263,10 @@ async def _main_loop(self):
263263 current_block = await self .subtensor .get_current_block ()
264264 btul .logging .debug (f"Block #{ current_block } " )
265265
266+ # Ensure the metagraph is ready
267+ btul .logging .debug ("Ensure metagraph readiness" )
268+ await self .database .wait_until_ready ("metagraph" )
269+
266270 # Get the last time the neurons have been updated
267271 last_updated = await self .database .get_neuron_last_updated ()
268272 if self .previous_last_updated != last_updated :
Original file line number Diff line number Diff line change @@ -172,7 +172,9 @@ async def run(self):
172172 await check_redis_connection (port = self .settings .database_port )
173173
174174 # Wait until the metagraph is ready
175+ btul .logging .info ("Waiting for metagraph readiness..." )
175176 await self .database .wait_until_ready ("metagraph" )
177+ btul .logging .info ("Metagraph is ready." )
176178
177179 # File monitor
178180 self .file_monitor = FileMonitor ()
@@ -218,6 +220,10 @@ async def run(self):
218220 current_block = 0
219221 while not self .should_exit .is_set ():
220222 try :
223+ # Ensure the metagraph is ready
224+ btul .logging .debug ("Ensure metagraph readiness" )
225+ await self .database .wait_until_ready ("metagraph" )
226+
221227 # Get the last time neurons have been updated
222228 last_updated = await self .database .get_neuron_last_updated ()
223229 if last_updated == 0 :
@@ -270,8 +276,12 @@ async def run(self):
270276 )
271277
272278 # Get the miners with no ips
273- miners_not_serving = [x .uid for x in self .miners if x .ip == "0.0.0.0" ]
274- btul .logging .debug (f"Miners not serving (not selectable): { miners_not_serving } " )
279+ miners_not_serving = [
280+ x .uid for x in self .miners if x .ip == "0.0.0.0"
281+ ]
282+ btul .logging .debug (
283+ f"Miners not serving (not selectable): { miners_not_serving } "
284+ )
275285
276286 # Build the list of uids reset
277287 uids_reset = np .flatnonzero (
You can’t perform that action at this time.
0 commit comments