@@ -27,7 +27,7 @@ use graph::{
2727use graph_chain_ethereum as ethereum;
2828use graph_store_postgres:: { BlockStore , ChainHeadUpdateListener } ;
2929
30- use std:: { any:: Any , cmp:: Ordering , sync:: Arc , time:: Duration } ;
30+ use std:: { any:: Any , cmp:: Ordering , collections :: HashMap , sync:: Arc , time:: Duration } ;
3131
3232use crate :: chain:: {
3333 create_ethereum_networks, create_firehose_networks, networks_as_chains, AnyChainFilter ,
@@ -91,13 +91,14 @@ impl AdapterConfiguration {
9191}
9292
9393use graph_chain_ethereum:: health:: { health_check_task, Health } ;
94+ use tokio_util:: sync:: CancellationToken ;
9495
9596pub struct Networks {
9697 pub adapters : Vec < AdapterConfiguration > ,
9798 pub rpc_provider_manager : ProviderManager < EthereumNetworkAdapter > ,
9899 pub firehose_provider_manager : ProviderManager < Arc < FirehoseEndpoint > > ,
99100 pub weighted_rpc_steering : bool ,
100- pub health_checkers : Vec < Arc < Health > > ,
101+ pub health_checkers : HashMap < ChainName , Vec < Arc < Health > > > ,
101102}
102103
103104impl Networks {
@@ -116,7 +117,7 @@ impl Networks {
116117 ProviderCheckStrategy :: MarkAsValid ,
117118 ) ,
118119 weighted_rpc_steering : false ,
119- health_checkers : vec ! [ ] ,
120+ health_checkers : HashMap :: new ( ) ,
120121 }
121122 }
122123
@@ -260,13 +261,20 @@ impl Networks {
260261 } ,
261262 ) ;
262263
263- let health_checkers: Vec < _ > = eth_adapters
264+ let health_checkers: HashMap < ChainName , Vec < Arc < Health > > > = eth_adapters
264265 . clone ( )
265- . flat_map ( |( _, adapters) | adapters)
266- . map ( |adapter| Arc :: new ( Health :: new ( adapter. adapter . clone ( ) ) ) )
266+ . map ( |( chain_id, adapters) | {
267+ let checkers = adapters
268+ . iter ( )
269+ . map ( |a| Arc :: new ( Health :: new ( a. adapter ( ) . clone ( ) ) ) )
270+ . collect ( ) ;
271+ ( chain_id, checkers)
272+ } )
267273 . collect ( ) ;
268274 if weighted_rpc_steering {
269- tokio:: spawn ( health_check_task ( health_checkers. clone ( ) ) ) ;
275+ let cancel_token = CancellationToken :: new ( ) ;
276+ let all: Vec < _ > = health_checkers. values ( ) . flatten ( ) . cloned ( ) . collect ( ) ;
277+ tokio:: spawn ( health_check_task ( all, cancel_token) ) ;
270278 }
271279
272280 let firehose_adapters = adapters
@@ -388,13 +396,22 @@ impl Networks {
388396 . flat_map ( |eth_c| eth_c. call_only . clone ( ) )
389397 . collect_vec ( ) ;
390398
399+ let chain_checkers: std:: collections:: HashMap < String , Arc < Health > > = self
400+ . health_checkers
401+ . get ( & chain_id)
402+ . cloned ( )
403+ . unwrap_or_default ( )
404+ . into_iter ( )
405+ . map ( |h| ( h. provider ( ) . to_string ( ) , h) )
406+ . collect ( ) ;
407+
391408 EthereumNetworkAdapters :: new (
392409 chain_id,
393410 self . rpc_provider_manager . clone ( ) ,
394411 eth_adapters,
395412 None ,
396413 self . weighted_rpc_steering ,
397- self . health_checkers . clone ( ) ,
414+ chain_checkers ,
398415 )
399416 }
400417}
0 commit comments