diff --git a/node/connection/src/lib.rs b/node/connection/src/lib.rs index 426b7c5..83a2cc1 100644 --- a/node/connection/src/lib.rs +++ b/node/connection/src/lib.rs @@ -123,7 +123,6 @@ pub async fn start_listen( loop { // Asynchronously wait for an inbound TcpStream. log::info!("Starting accept"); - notifier.notify_one(); match listener.accept().await { Ok((stream, peer_address)) => { log::info!("Accepted connection from {:?}", peer_address); @@ -210,7 +209,8 @@ where if handle_received(writer, msg).await.is_err() { return Err("Message handling failure: Closing peer connection".into()); } else { - notifier.notified(); + log::debug!("Message handled"); + notifier.notify_one(); } }, Ok(msg_bytes) = send_to_all_receiver.recv() => { @@ -356,29 +356,26 @@ mod tests { let (send_to_all_tx, send_to_all_rx) = broadcast::channel::(32); - let notify = Arc::new(Notify::new()); - let notify_listen_cloned = notify.clone(); - let notify_connect_cloned = notify.clone(); - tokio::spawn(async move { start_listen( - "localhost:6680".to_string(), + "127.0.0.1:6680".to_string(), listen_manager_cloned, 32, send_to_all_tx, - notify_listen_cloned, + Arc::new(Notify::new()), ) .await; }); - notify.notified().await; + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); let _ = connect( - "localhost:6680".to_string(), + "127.0.0.1:6680".to_string(), connect_manager_cloned, 32, send_to_all_rx, - notify_connect_cloned, + notify_clone, ) .await;