@@ -117,7 +117,7 @@ use lightning::routing::utxo::UtxoLookup;
117117use lightning:: util:: config:: { ChannelHandshakeConfig , ChannelHandshakeLimits , UserConfig } ;
118118use lightning:: util:: ser:: ReadableArgs ;
119119
120- use lightning_background_processor:: BackgroundProcessor ;
120+ use lightning_background_processor:: process_events_async ;
121121use lightning_background_processor:: GossipSync as BPGossipSync ;
122122
123123use lightning_transaction_sync:: EsploraSyncClient ;
@@ -583,9 +583,7 @@ impl Builder {
583583/// upon [`Node::stop()`].
584584struct Runtime {
585585 tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
586- _background_processor : BackgroundProcessor ,
587- stop_networking : Arc < AtomicBool > ,
588- stop_wallet_sync : Arc < AtomicBool > ,
586+ stop_runtime : Arc < AtomicBool > ,
589587}
590588
591589/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
@@ -640,11 +638,10 @@ impl Node {
640638
641639 let runtime = run_lock. as_ref ( ) . unwrap ( ) ;
642640
643- // Stop wallet sync
644- runtime. stop_wallet_sync . store ( true , Ordering :: Release ) ;
641+ // Stop the runtime.
642+ runtime. stop_runtime . store ( true , Ordering :: Release ) ;
645643
646- // Stop networking
647- runtime. stop_networking . store ( true , Ordering :: Release ) ;
644+ // Stop disconnect peers.
648645 self . peer_manager . disconnect_all_peers ( ) ;
649646
650647 // Drop the held runtimes.
@@ -661,6 +658,8 @@ impl Node {
661658
662659 self . wallet . set_runtime ( Arc :: clone ( & tokio_runtime) ) ;
663660
661+ let stop_runtime = Arc :: new ( AtomicBool :: new ( false ) ) ;
662+
664663 let event_handler = Arc :: new ( EventHandler :: new (
665664 Arc :: clone ( & self . wallet ) ,
666665 Arc :: clone ( & self . event_queue ) ,
@@ -679,8 +678,7 @@ impl Node {
679678 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
680679 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
681680 let sync_logger = Arc :: clone ( & self . logger ) ;
682- let stop_wallet_sync = Arc :: new ( AtomicBool :: new ( false ) ) ;
683- let stop_sync = Arc :: clone ( & stop_wallet_sync) ;
681+ let stop_sync = Arc :: clone ( & stop_runtime) ;
684682
685683 std:: thread:: spawn ( move || {
686684 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
@@ -711,7 +709,7 @@ impl Node {
711709 } ) ;
712710
713711 let sync_logger = Arc :: clone ( & self . logger ) ;
714- let stop_sync = Arc :: clone ( & stop_wallet_sync ) ;
712+ let stop_sync = Arc :: clone ( & stop_runtime ) ;
715713 tokio_runtime. spawn ( async move {
716714 loop {
717715 if stop_sync. load ( Ordering :: Acquire ) {
@@ -736,11 +734,10 @@ impl Node {
736734 }
737735 } ) ;
738736
739- let stop_networking = Arc :: new ( AtomicBool :: new ( false ) ) ;
740737 if let Some ( listening_address) = & self . config . listening_address {
741738 // Setup networking
742739 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
743- let stop_listen = Arc :: clone ( & stop_networking ) ;
740+ let stop_listen = Arc :: clone ( & stop_runtime ) ;
744741 let listening_address = listening_address. clone ( ) ;
745742
746743 tokio_runtime. spawn ( async move {
@@ -770,7 +767,7 @@ impl Node {
770767 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
771768 let connect_logger = Arc :: clone ( & self . logger ) ;
772769 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
773- let stop_connect = Arc :: clone ( & stop_networking ) ;
770+ let stop_connect = Arc :: clone ( & stop_runtime ) ;
774771 tokio_runtime. spawn ( async move {
775772 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
776773 loop {
@@ -803,19 +800,45 @@ impl Node {
803800 } ) ;
804801
805802 // Setup background processing
806- let _background_processor = BackgroundProcessor :: start (
807- Arc :: clone ( & self . kv_store ) ,
808- Arc :: clone ( & event_handler) ,
809- Arc :: clone ( & self . chain_monitor ) ,
810- Arc :: clone ( & self . channel_manager ) ,
811- BPGossipSync :: p2p ( Arc :: clone ( & self . gossip_sync ) ) ,
812- Arc :: clone ( & self . peer_manager ) ,
813- Arc :: clone ( & self . logger ) ,
814- Some ( Arc :: clone ( & self . scorer ) ) ,
815- ) ;
803+ let background_persister = Arc :: clone ( & self . kv_store ) ;
804+ let background_event_handler = Arc :: clone ( & event_handler) ;
805+ let background_chain_mon = Arc :: clone ( & self . chain_monitor ) ;
806+ let background_chan_man = Arc :: clone ( & self . channel_manager ) ;
807+ let background_gossip_sync = BPGossipSync :: p2p ( Arc :: clone ( & self . gossip_sync ) ) ;
808+ let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
809+ let background_logger = Arc :: clone ( & self . logger ) ;
810+ let background_scorer = Arc :: clone ( & self . scorer ) ;
811+ let stop_background_processing = Arc :: clone ( & stop_runtime) ;
812+ let sleeper = move |d| {
813+ let stop = Arc :: clone ( & stop_background_processing) ;
814+ Box :: pin ( async move {
815+ if stop. load ( Ordering :: Acquire ) {
816+ true
817+ } else {
818+ tokio:: time:: sleep ( d) . await ;
819+ false
820+ }
821+ } )
822+ } ;
823+
824+ tokio_runtime. spawn ( async move {
825+ process_events_async (
826+ background_persister,
827+ |e| background_event_handler. handle_event ( e) ,
828+ background_chain_mon,
829+ background_chan_man,
830+ background_gossip_sync,
831+ background_peer_man,
832+ background_logger,
833+ Some ( background_scorer) ,
834+ sleeper,
835+ true ,
836+ )
837+ . await
838+ . expect ( "Failed to process events" ) ;
839+ } ) ;
816840
817- // TODO: frequently check back on background_processor if there was an error
818- Ok ( Runtime { tokio_runtime, _background_processor, stop_networking, stop_wallet_sync } )
841+ Ok ( Runtime { tokio_runtime, stop_runtime } )
819842 }
820843
821844 /// Blocks until the next event is available.
0 commit comments