From ffa1fe9dfd55188d602cbfbb6553a3934e3b9b3f Mon Sep 17 00:00:00 2001 From: Nickolay Ilyushin Date: Mon, 28 Oct 2024 20:45:09 +0200 Subject: [PATCH 1/5] Implement plugin support for resolving --- Cargo.toml | 1 + src/lib.rs | 5 ++ src/plugin.rs | 12 ++++ src/service_daemon.rs | 148 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 src/plugin.rs diff --git a/Cargo.toml b/Cargo.toml index b10befa..c67b07f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ description = "mDNS Service Discovery library with no async runtime dependency" async = ["flume/async"] logging = ["log"] default = ["async", "logging"] +plugins = [] [dependencies] flume = { version = "0.11", default-features = false } # channel between threads diff --git a/src/lib.rs b/src/lib.rs index 9f1af4d..22bc28a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,6 +150,8 @@ mod dns_parser; mod error; mod service_daemon; mod service_info; +#[cfg(feature = "plugins")] +mod plugin; pub use error::{Error, Result}; pub use service_daemon::{ @@ -158,5 +160,8 @@ pub use service_daemon::{ }; pub use service_info::{AsIpAddrs, IntoTxtProperties, ServiceInfo, TxtProperties, TxtProperty}; +#[cfg(feature = "plugins")] +pub use plugin::PluginCommand; + /// A handler to receive messages from [ServiceDaemon]. Re-export from `flume` crate. pub use flume::Receiver; diff --git a/src/plugin.rs b/src/plugin.rs new file mode 100644 index 0000000..b35b964 --- /dev/null +++ b/src/plugin.rs @@ -0,0 +1,12 @@ +use std::collections::HashMap; +use crate::ServiceInfo; +use flume::Sender; + +/// Commands to be implemented by plugins +#[derive(Debug)] +pub enum PluginCommand { + /// Command to fetch services that are currently provided by the plugin + ListServices(Sender>), + + Exit(Sender<()>), +} diff --git a/src/service_daemon.rs b/src/service_daemon.rs index dc6fae8..b6eb48e 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -30,6 +30,8 @@ // corresponds to a set of DNS Resource Records. #[cfg(feature = "logging")] use crate::log::{debug, error, warn}; +#[cfg(feature = "plugins")] +use crate::plugin::PluginCommand; use crate::{ dns_cache::DnsCache, dns_parser::{ @@ -278,6 +280,22 @@ impl ServiceDaemon { self.send_cmd(Command::StopResolveHostname(hostname.to_string())) } + /// Registers a plugin provided by the library consumer, to support dynamic mDNS resolution. + /// + /// Please be aware that this resolution should be relatively consistent, e.g. configured + /// externally. + /// + /// If feature `plugins` is enabled, the daemon will send requests to the plugins + /// using the flume channel that will be sent to `pc_recv`. + /// + /// Please note that enabling the feature enables fetching the plugin-provided services + /// on *every* request, so this is disabled by default due to extra overhead. + #[cfg(feature = "plugins")] + pub fn register_plugin(&self, name: String, pc_recv: Sender>) -> Result<()> { + self.send_cmd(Command::RegisterPlugin(name, pc_recv)) + } + + /// Registers a service provided by this host. /// /// If `service_info` has no addresses yet and its `addr_auto` is enabled, @@ -292,6 +310,7 @@ impl ServiceDaemon { self.send_cmd(Command::Register(service_info)) } + /// Unregisters a service. This is a graceful shutdown of a service. /// /// Returns a channel receiver that is used to receive the status code @@ -401,9 +420,27 @@ impl ServiceDaemon { fn daemon_thread(signal_sock: UdpSocket, poller: Poller, receiver: Receiver) { let zc = Zeroconf::new(signal_sock, poller); + #[cfg(feature = "plugins")] + let plugin_senders = zc.plugin_senders.clone(); + if let Some(cmd) = Self::run(zc, receiver) { match cmd { Command::Exit(resp_s) => { + #[cfg(feature = "plugins")] + for (plugin, sender) in plugin_senders.clone() { + let (p_send, p_recv) = bounded(1); + + match sender.send(PluginCommand::Exit(p_send)) { + Ok(()) => {}, + Err(e) => error!("failed to send plugin exit command: {}, {}", plugin, e) + }; + + match p_recv.recv() { + Ok(()) => debug!("plugin {} exited successfully", plugin), + Err(e) => error!("plugin {} failed to exit: {}", plugin, e) + } + } + // It is guaranteed that the receiver already dropped, // i.e. the daemon command channel closed. if let Err(e) = resp_s.send(DaemonStatus::Shutdown) { @@ -658,6 +695,11 @@ impl ServiceDaemon { zc.monitors.push(resp_s); } + #[cfg(feature = "plugins")] + Command::RegisterPlugin(name, resp_s) => { + zc.register_plugin(name, resp_s); + } + Command::SetOption(daemon_opt) => { zc.process_set_option(daemon_opt); } @@ -930,6 +972,9 @@ struct Zeroconf { /// Service instances that are pending for resolving SRV and TXT. pending_resolves: HashSet, + + #[cfg(feature = "plugins")] + plugin_senders: HashMap>, } impl Zeroconf { @@ -979,6 +1024,8 @@ impl Zeroconf { timers, status, pending_resolves: HashSet::new(), + #[cfg(feature = "plugins")] + plugin_senders: HashMap::new(), } } @@ -1875,12 +1922,26 @@ impl Zeroconf { // See https://datatracker.ietf.org/doc/html/rfc6763#section-9 const META_QUERY: &str = "_services._dns-sd._udp.local."; + let mut all_services = HashMap::new(); + + for (k, v) in &self.my_services { + all_services.insert(k, v); + } + + let services_by_plugins = self.list_plugin_services(); + + for (_plugin, services) in &services_by_plugins { + for (k, v) in services { + all_services.insert(k, v); + } + } + for question in msg.questions.iter() { debug!("query question: {:?}", &question); let qtype = question.entry.ty; if qtype == TYPE_PTR { - for service in self.my_services.values() { + for service in all_services.values() { if question.entry.name == service.get_type() || service .get_subtype() @@ -1906,7 +1967,7 @@ impl Zeroconf { } } else { if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY { - for service in self.my_services.values() { + for service in all_services.values() { if service.get_hostname().to_lowercase() == question.entry.name.to_lowercase() { @@ -1940,7 +2001,7 @@ impl Zeroconf { } let name_to_find = question.entry.name.to_lowercase(); - let service = match self.my_services.get(&name_to_find) { + let service = match all_services.get(&name_to_find) { Some(s) => s, None => continue, }; @@ -2315,6 +2376,82 @@ impl Zeroconf { self.increase_counter(Counter::CacheRefreshSRV, query_srv_count); self.increase_counter(Counter::CacheRefreshAddr, query_addr_count); } + + // Returns (Plugin, Map) + #[cfg(feature = "plugins")] + fn list_plugin_services(&self) -> Vec<(String, HashMap)> { + let mut output = vec![]; + + for key in self.plugin_senders.keys() { + output.push((key.clone(), self.list_plugin_services_for(key))); + } + + output + } + + #[cfg(not(feature = "plugins"))] + fn list_plugin_services(&self) -> Vec<(String, HashMap)> { + vec![] + } + + #[cfg(feature = "plugins")] + fn list_plugin_services_for(&self, plugin: &str) -> HashMap { + let (r_send, r_recv) = bounded(1); + + let p_send = match self.plugin_senders.get(plugin) { + None => { + warn!("Could not find plugin {}", plugin); + + return HashMap::new(); + }, + Some(p_send) => p_send, + }; + + match p_send.send(PluginCommand::ListServices(r_send)) { + Ok(()) => {}, + Err(e) => warn!("Failed to send ListServices command: {}", e), + } + + r_recv.recv().unwrap_or_else(|e| { + warn!("Could not receive service list: {}", e); + + HashMap::new() + }) + } + + #[cfg(feature = "plugins")] + fn register_plugin(&mut self, name: String, resp_s: Sender>) { + let (send, recv) = bounded(100); + + match resp_s.send(recv) { + Ok(()) => debug!("Registered a plugin"), + Err(e) => { + error!("Failed to send registered plugin's receive handle: {}", e); + + return; + } + } + + if self.plugin_senders.contains_key(&name) { + let old_send = self.plugin_senders.get(&name).unwrap(); + + let (exit_send, exit_recv) = bounded(1); + + match old_send.send(PluginCommand::Exit(exit_send)) { + Ok(()) => debug!("Requested old plugin exit"), + Err(e) => warn!("Failed to send exit command to a plugin: {}", e) + } + + match exit_recv.recv_timeout(Duration::from_secs(1)) { + Ok(()) => debug!("The old plugin exited"), + Err(e) => warn!("Old plugin's exit timed out: {}", e) + } + } + + debug!("Registered a new plugin: {}", name); + + self.plugin_senders.insert(name, send); + } } /// All possible events sent to the client from the daemon @@ -2410,6 +2547,9 @@ enum Command { SetOption(DaemonOption), + #[cfg(feature = "plugins")] + RegisterPlugin(String, Sender>), + Exit(Sender), } @@ -2429,6 +2569,8 @@ impl fmt::Display for Command { Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"), Self::Unregister(_, _) => write!(f, "Command Unregister"), Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"), + #[cfg(feature = "plugins")] + Self::RegisterPlugin(name, _) => write!(f, "Command RegisterPlugin: {}", name), Self::Resolve(_, _) => write!(f, "Command Resolve"), } } From fb49d6936617ed54cfd960295799be762f1d2d08 Mon Sep 17 00:00:00 2001 From: Nickolay Ilyushin Date: Mon, 28 Oct 2024 21:01:13 +0200 Subject: [PATCH 2/5] Reformat plugin-specific code --- src/lib.rs | 4 ++-- src/plugin.rs | 2 +- src/service_daemon.rs | 24 ++++++++++++++---------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 22bc28a..e229b33 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,10 +148,10 @@ mod log { mod dns_cache; mod dns_parser; mod error; -mod service_daemon; -mod service_info; #[cfg(feature = "plugins")] mod plugin; +mod service_daemon; +mod service_info; pub use error::{Error, Result}; pub use service_daemon::{ diff --git a/src/plugin.rs b/src/plugin.rs index b35b964..8cc0be3 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use crate::ServiceInfo; use flume::Sender; +use std::collections::HashMap; /// Commands to be implemented by plugins #[derive(Debug)] diff --git a/src/service_daemon.rs b/src/service_daemon.rs index b6eb48e..850c125 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -291,11 +291,14 @@ impl ServiceDaemon { /// Please note that enabling the feature enables fetching the plugin-provided services /// on *every* request, so this is disabled by default due to extra overhead. #[cfg(feature = "plugins")] - pub fn register_plugin(&self, name: String, pc_recv: Sender>) -> Result<()> { + pub fn register_plugin( + &self, + name: String, + pc_recv: Sender>, + ) -> Result<()> { self.send_cmd(Command::RegisterPlugin(name, pc_recv)) } - /// Registers a service provided by this host. /// /// If `service_info` has no addresses yet and its `addr_auto` is enabled, @@ -310,7 +313,6 @@ impl ServiceDaemon { self.send_cmd(Command::Register(service_info)) } - /// Unregisters a service. This is a graceful shutdown of a service. /// /// Returns a channel receiver that is used to receive the status code @@ -431,13 +433,15 @@ impl ServiceDaemon { let (p_send, p_recv) = bounded(1); match sender.send(PluginCommand::Exit(p_send)) { - Ok(()) => {}, - Err(e) => error!("failed to send plugin exit command: {}, {}", plugin, e) + Ok(()) => {} + Err(e) => { + error!("failed to send plugin exit command: {}, {}", plugin, e) + } }; match p_recv.recv() { Ok(()) => debug!("plugin {} exited successfully", plugin), - Err(e) => error!("plugin {} failed to exit: {}", plugin, e) + Err(e) => error!("plugin {} failed to exit: {}", plugin, e), } } @@ -2403,12 +2407,12 @@ impl Zeroconf { warn!("Could not find plugin {}", plugin); return HashMap::new(); - }, + } Some(p_send) => p_send, }; match p_send.send(PluginCommand::ListServices(r_send)) { - Ok(()) => {}, + Ok(()) => {} Err(e) => warn!("Failed to send ListServices command: {}", e), } @@ -2439,12 +2443,12 @@ impl Zeroconf { match old_send.send(PluginCommand::Exit(exit_send)) { Ok(()) => debug!("Requested old plugin exit"), - Err(e) => warn!("Failed to send exit command to a plugin: {}", e) + Err(e) => warn!("Failed to send exit command to a plugin: {}", e), } match exit_recv.recv_timeout(Duration::from_secs(1)) { Ok(()) => debug!("The old plugin exited"), - Err(e) => warn!("Old plugin's exit timed out: {}", e) + Err(e) => warn!("Old plugin's exit timed out: {}", e), } } From c891f4d294c4031e8302650c4d51856757578bb0 Mon Sep 17 00:00:00 2001 From: Nickolay Ilyushin Date: Tue, 29 Oct 2024 19:33:56 +0200 Subject: [PATCH 3/5] Implement tests, refactor parts of plugin-specific code for laziness purpose --- .github/workflows/build.yml | 4 +- src/plugin.rs | 5 +- src/service_daemon.rs | 68 ++++++++++++------------ tests/mdns_test.rs | 100 ++++++++++++++++++++++++++++++++++-- 4 files changed, 136 insertions(+), 41 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aa6cae3..98d165a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,7 +32,7 @@ jobs: run: cargo clippy -- -D warnings - name: Run tests with debugs if: matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' - run: RUST_LOG=debug cargo test --verbose + run: RUST_LOG=debug cargo test --verbose --features plugins - name: Run tests on Windows if: matrix.os == 'windows-latest' - run: cargo test + run: cargo test --features plugins diff --git a/src/plugin.rs b/src/plugin.rs index 8cc0be3..71d0d55 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,12 +1,15 @@ use crate::ServiceInfo; use flume::Sender; use std::collections::HashMap; +use std::sync::Arc; /// Commands to be implemented by plugins #[derive(Debug)] pub enum PluginCommand { + Registered, + /// Command to fetch services that are currently provided by the plugin - ListServices(Sender>), + ListServices(Sender>>), Exit(Sender<()>), } diff --git a/src/service_daemon.rs b/src/service_daemon.rs index 850c125..93062a5 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -58,6 +58,8 @@ use std::{ time::Duration, vec, }; +use std::cell::LazyCell; +use std::sync::Arc; /// A simple macro to report all kinds of errors. macro_rules! e_fmt { @@ -286,7 +288,7 @@ impl ServiceDaemon { /// externally. /// /// If feature `plugins` is enabled, the daemon will send requests to the plugins - /// using the flume channel that will be sent to `pc_recv`. + /// using the flume channel sender for which needs to be provided as `pc_send`. /// /// Please note that enabling the feature enables fetching the plugin-provided services /// on *every* request, so this is disabled by default due to extra overhead. @@ -294,9 +296,9 @@ impl ServiceDaemon { pub fn register_plugin( &self, name: String, - pc_recv: Sender>, + pc_send: Sender, ) -> Result<()> { - self.send_cmd(Command::RegisterPlugin(name, pc_recv)) + self.send_cmd(Command::RegisterPlugin(name, pc_send)) } /// Registers a service provided by this host. @@ -700,8 +702,8 @@ impl ServiceDaemon { } #[cfg(feature = "plugins")] - Command::RegisterPlugin(name, resp_s) => { - zc.register_plugin(name, resp_s); + Command::RegisterPlugin(name, papi_send) => { + zc.register_plugin(name, papi_send); } Command::SetOption(daemon_opt) => { @@ -1926,26 +1928,30 @@ impl Zeroconf { // See https://datatracker.ietf.org/doc/html/rfc6763#section-9 const META_QUERY: &str = "_services._dns-sd._udp.local."; - let mut all_services = HashMap::new(); - - for (k, v) in &self.my_services { - all_services.insert(k, v); - } - let services_by_plugins = self.list_plugin_services(); - for (_plugin, services) in &services_by_plugins { - for (k, v) in services { + let all_services_cell = LazyCell::new(|| { + let mut all_services: HashMap<&String, &ServiceInfo> = HashMap::new(); + + for (k, v) in &self.my_services { all_services.insert(k, v); } - } + + for (_plugin, services) in &services_by_plugins { + for (k, v) in services { + all_services.insert(k, v); + } + } + + all_services + }); for question in msg.questions.iter() { debug!("query question: {:?}", &question); let qtype = question.entry.ty; if qtype == TYPE_PTR { - for service in all_services.values() { + for service in (*all_services_cell).values() { if question.entry.name == service.get_type() || service .get_subtype() @@ -1971,7 +1977,7 @@ impl Zeroconf { } } else { if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY { - for service in all_services.values() { + for service in (*all_services_cell).values() { if service.get_hostname().to_lowercase() == question.entry.name.to_lowercase() { @@ -2005,7 +2011,7 @@ impl Zeroconf { } let name_to_find = question.entry.name.to_lowercase(); - let service = match all_services.get(&name_to_find) { + let service = match (*all_services_cell).get(&name_to_find) { Some(s) => s, None => continue, }; @@ -2383,7 +2389,7 @@ impl Zeroconf { // Returns (Plugin, Map) #[cfg(feature = "plugins")] - fn list_plugin_services(&self) -> Vec<(String, HashMap)> { + fn list_plugin_services(&self) -> Vec<(String, HashMap>)> { let mut output = vec![]; for key in self.plugin_senders.keys() { @@ -2394,12 +2400,12 @@ impl Zeroconf { } #[cfg(not(feature = "plugins"))] - fn list_plugin_services(&self) -> Vec<(String, HashMap)> { + fn list_plugin_services(&self) -> Vec<(String, HashMap>)> { vec![] } #[cfg(feature = "plugins")] - fn list_plugin_services_for(&self, plugin: &str) -> HashMap { + fn list_plugin_services_for(&self, plugin: &str) -> HashMap> { let (r_send, r_recv) = bounded(1); let p_send = match self.plugin_senders.get(plugin) { @@ -2424,18 +2430,7 @@ impl Zeroconf { } #[cfg(feature = "plugins")] - fn register_plugin(&mut self, name: String, resp_s: Sender>) { - let (send, recv) = bounded(100); - - match resp_s.send(recv) { - Ok(()) => debug!("Registered a plugin"), - Err(e) => { - error!("Failed to send registered plugin's receive handle: {}", e); - - return; - } - } - + fn register_plugin(&mut self, name: String, papi_send: Sender) { if self.plugin_senders.contains_key(&name) { let old_send = self.plugin_senders.get(&name).unwrap(); @@ -2454,7 +2449,12 @@ impl Zeroconf { debug!("Registered a new plugin: {}", name); - self.plugin_senders.insert(name, send); + self.plugin_senders.insert(name, papi_send.clone()); + + match papi_send.send(PluginCommand::Registered) { + Ok(()) => {}, + Err(e) => warn!("Failed to send a registration notification to a plugin: {}", e), + }; } } @@ -2552,7 +2552,7 @@ enum Command { SetOption(DaemonOption), #[cfg(feature = "plugins")] - RegisterPlugin(String, Sender>), + RegisterPlugin(String, Sender), Exit(Sender), } diff --git a/tests/mdns_test.rs b/tests/mdns_test.rs index 620f53b..a0da5bb 100644 --- a/tests/mdns_test.rs +++ b/tests/mdns_test.rs @@ -1,12 +1,17 @@ use if_addrs::{IfAddr, Interface}; -use mdns_sd::{ - DaemonEvent, DaemonStatus, HostnameResolutionEvent, IfKind, IntoTxtProperties, ServiceDaemon, - ServiceEvent, ServiceInfo, UnregisterStatus, -}; +use mdns_sd::{DaemonEvent, DaemonStatus, HostnameResolutionEvent, IfKind, IntoTxtProperties, ServiceDaemon, ServiceEvent, ServiceInfo, UnregisterStatus}; +#[cfg(feature = "plugins")] +use mdns_sd::{PluginCommand}; use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +#[cfg(feature = "plugins")] +use std::sync::Arc; use std::thread::sleep; +#[cfg(feature = "plugins")] +use std::thread::spawn; use std::time::{Duration, SystemTime}; +#[cfg(feature = "plugins")] +use flume::bounded; // use test_log::test; // commented out for debugging a flaky test in CI. /// This test covers: @@ -1456,6 +1461,93 @@ fn test_domain_suffix_in_browse() { mdns_client.shutdown().unwrap(); } +#[test] +#[cfg(feature = "plugins")] +fn plugin_support_test() { + let mdns_server = ServiceDaemon::new().expect("failed to create mdns server"); + let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client"); + + let mut ips = vec![]; + + for i in my_ip_interfaces() { + mdns_server.enable_interface(&i.name).unwrap(); + mdns_client.enable_interface(&i.name).unwrap(); + ips.push(i.ip().to_string()); + } + + let (papi_send, papi_recv) = bounded(100); + + mdns_server.register_plugin("test".to_string(), papi_send).expect("failed to register plugin"); + + let cmd_registered = papi_recv.recv().expect("failed to receive command"); + + match cmd_registered { + PluginCommand::Registered => {}, + _ => panic!("Wrong plugin command received"), + }; + + spawn(move || { + let service_info_arc = Arc::new({ + let service_type = "somehost._tcp.local."; + let instance_name = "somehost"; + let ip = ips.join(","); + let host_name = "somehost.local."; + let port = 5200; + let properties = [("property_1", "test"), ("property_2", "1234")]; + + ServiceInfo::new( + service_type, + instance_name, + host_name, + ip, + port, + &properties[..], + ).unwrap() + }); + + loop { + let cmd = papi_recv.recv(); + + match cmd { + Ok(PluginCommand::Registered) => {}, + Ok(PluginCommand::Exit(sender)) => { + sender.send(()).unwrap(); + return; + }, + Ok(PluginCommand::ListServices(sender)) => { + let mut map = HashMap::new(); + + map.insert("somehost.local.".to_string(), service_info_arc.clone()); + + sender.send(map).unwrap(); + }, + Err(_) => return + } + } + }); + + let browse_chan = mdns_client.resolve_hostname("somehost.local.", None).unwrap(); + + let mut resolved = false; + + while let Ok(event) = browse_chan.recv() { + match event { + HostnameResolutionEvent::AddressesFound(host, _addresses) => { + resolved = true; + println!("Resolved a service of {}", &host); + break; + } + other => { + println!("Received event {:?}", other); + } + } + } + assert!(resolved); + + mdns_server.shutdown().unwrap(); + mdns_client.shutdown().unwrap(); +} + /// A helper function to include a timestamp for println. fn timed_println(msg: String) { let now = SystemTime::now(); From c6b9f7f1925d468065a4995cc561d8cab958eb4b Mon Sep 17 00:00:00 2001 From: Nickolay Ilyushin Date: Tue, 29 Oct 2024 19:34:57 +0200 Subject: [PATCH 4/5] Reformat code --- src/service_daemon.rs | 17 ++++++++--------- tests/mdns_test.rs | 32 ++++++++++++++++++++------------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/src/service_daemon.rs b/src/service_daemon.rs index 93062a5..66dfdcb 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -48,6 +48,8 @@ use flume::{bounded, Sender, TrySendError}; use if_addrs::{IfAddr, Interface}; use polling::Poller; use socket2::{SockAddr, Socket}; +use std::cell::LazyCell; +use std::sync::Arc; use std::{ cmp::{self, Reverse}, collections::{BinaryHeap, HashMap, HashSet}, @@ -58,8 +60,6 @@ use std::{ time::Duration, vec, }; -use std::cell::LazyCell; -use std::sync::Arc; /// A simple macro to report all kinds of errors. macro_rules! e_fmt { @@ -293,11 +293,7 @@ impl ServiceDaemon { /// Please note that enabling the feature enables fetching the plugin-provided services /// on *every* request, so this is disabled by default due to extra overhead. #[cfg(feature = "plugins")] - pub fn register_plugin( - &self, - name: String, - pc_send: Sender, - ) -> Result<()> { + pub fn register_plugin(&self, name: String, pc_send: Sender) -> Result<()> { self.send_cmd(Command::RegisterPlugin(name, pc_send)) } @@ -2452,8 +2448,11 @@ impl Zeroconf { self.plugin_senders.insert(name, papi_send.clone()); match papi_send.send(PluginCommand::Registered) { - Ok(()) => {}, - Err(e) => warn!("Failed to send a registration notification to a plugin: {}", e), + Ok(()) => {} + Err(e) => warn!( + "Failed to send a registration notification to a plugin: {}", + e + ), }; } } diff --git a/tests/mdns_test.rs b/tests/mdns_test.rs index a0da5bb..d683073 100644 --- a/tests/mdns_test.rs +++ b/tests/mdns_test.rs @@ -1,7 +1,12 @@ +#[cfg(feature = "plugins")] +use flume::bounded; use if_addrs::{IfAddr, Interface}; -use mdns_sd::{DaemonEvent, DaemonStatus, HostnameResolutionEvent, IfKind, IntoTxtProperties, ServiceDaemon, ServiceEvent, ServiceInfo, UnregisterStatus}; #[cfg(feature = "plugins")] -use mdns_sd::{PluginCommand}; +use mdns_sd::PluginCommand; +use mdns_sd::{ + DaemonEvent, DaemonStatus, HostnameResolutionEvent, IfKind, IntoTxtProperties, ServiceDaemon, + ServiceEvent, ServiceInfo, UnregisterStatus, +}; use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; #[cfg(feature = "plugins")] @@ -10,8 +15,6 @@ use std::thread::sleep; #[cfg(feature = "plugins")] use std::thread::spawn; use std::time::{Duration, SystemTime}; -#[cfg(feature = "plugins")] -use flume::bounded; // use test_log::test; // commented out for debugging a flaky test in CI. /// This test covers: @@ -1477,12 +1480,14 @@ fn plugin_support_test() { let (papi_send, papi_recv) = bounded(100); - mdns_server.register_plugin("test".to_string(), papi_send).expect("failed to register plugin"); + mdns_server + .register_plugin("test".to_string(), papi_send) + .expect("failed to register plugin"); let cmd_registered = papi_recv.recv().expect("failed to receive command"); match cmd_registered { - PluginCommand::Registered => {}, + PluginCommand::Registered => {} _ => panic!("Wrong plugin command received"), }; @@ -1502,31 +1507,34 @@ fn plugin_support_test() { ip, port, &properties[..], - ).unwrap() + ) + .unwrap() }); loop { let cmd = papi_recv.recv(); match cmd { - Ok(PluginCommand::Registered) => {}, + Ok(PluginCommand::Registered) => {} Ok(PluginCommand::Exit(sender)) => { sender.send(()).unwrap(); return; - }, + } Ok(PluginCommand::ListServices(sender)) => { let mut map = HashMap::new(); map.insert("somehost.local.".to_string(), service_info_arc.clone()); sender.send(map).unwrap(); - }, - Err(_) => return + } + Err(_) => return, } } }); - let browse_chan = mdns_client.resolve_hostname("somehost.local.", None).unwrap(); + let browse_chan = mdns_client + .resolve_hostname("somehost.local.", None) + .unwrap(); let mut resolved = false; From c3771cfdd4f78360a826424ba19339ff35fa169a Mon Sep 17 00:00:00 2001 From: Nickolay Ilyushin Date: Tue, 29 Oct 2024 19:47:48 +0200 Subject: [PATCH 5/5] Drop LazyCell, it's unsupported in this rustc version --- src/service_daemon.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/service_daemon.rs b/src/service_daemon.rs index 66dfdcb..b0a3178 100644 --- a/src/service_daemon.rs +++ b/src/service_daemon.rs @@ -48,7 +48,6 @@ use flume::{bounded, Sender, TrySendError}; use if_addrs::{IfAddr, Interface}; use polling::Poller; use socket2::{SockAddr, Socket}; -use std::cell::LazyCell; use std::sync::Arc; use std::{ cmp::{self, Reverse}, @@ -1926,28 +1925,24 @@ impl Zeroconf { let services_by_plugins = self.list_plugin_services(); - let all_services_cell = LazyCell::new(|| { - let mut all_services: HashMap<&String, &ServiceInfo> = HashMap::new(); + let mut all_services: HashMap<&String, &ServiceInfo> = HashMap::new(); - for (k, v) in &self.my_services { - all_services.insert(k, v); - } + for (k, v) in &self.my_services { + all_services.insert(k, v); + } - for (_plugin, services) in &services_by_plugins { - for (k, v) in services { - all_services.insert(k, v); - } + for (_plugin, services) in &services_by_plugins { + for (k, v) in services { + all_services.insert(k, v); } - - all_services - }); + } for question in msg.questions.iter() { debug!("query question: {:?}", &question); let qtype = question.entry.ty; if qtype == TYPE_PTR { - for service in (*all_services_cell).values() { + for service in all_services.values() { if question.entry.name == service.get_type() || service .get_subtype() @@ -1973,7 +1968,7 @@ impl Zeroconf { } } else { if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY { - for service in (*all_services_cell).values() { + for service in all_services.values() { if service.get_hostname().to_lowercase() == question.entry.name.to_lowercase() { @@ -2007,7 +2002,7 @@ impl Zeroconf { } let name_to_find = question.entry.name.to_lowercase(); - let service = match (*all_services_cell).get(&name_to_find) { + let service = match all_services.get(&name_to_find) { Some(s) => s, None => continue, };