Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: cargo clippy -- -D warnings
- name: Run tests with debugs
if: matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest'
run: RUST_LOG=debug cargo test --verbose
run: RUST_LOG=debug cargo test --verbose --features plugins
- name: Run tests on Windows
if: matrix.os == 'windows-latest'
run: cargo test
run: cargo test --features plugins
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ description = "mDNS Service Discovery library with no async runtime dependency"
async = ["flume/async"]
logging = ["log"]
default = ["async", "logging"]
plugins = []

[dependencies]
flume = { version = "0.11", default-features = false } # channel between threads
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ mod log {
mod dns_cache;
mod dns_parser;
mod error;
#[cfg(feature = "plugins")]
mod plugin;
mod service_daemon;
mod service_info;

Expand All @@ -158,5 +160,8 @@ pub use service_daemon::{
};
pub use service_info::{AsIpAddrs, IntoTxtProperties, ServiceInfo, TxtProperties, TxtProperty};

#[cfg(feature = "plugins")]
pub use plugin::PluginCommand;

/// A handler to receive messages from [ServiceDaemon]. Re-export from `flume` crate.
pub use flume::Receiver;
15 changes: 15 additions & 0 deletions src/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::ServiceInfo;
use flume::Sender;
use std::collections::HashMap;
use std::sync::Arc;

/// Commands to be implemented by plugins
#[derive(Debug)]
pub enum PluginCommand {
Registered,

/// Command to fetch services that are currently provided by the plugin
ListServices(Sender<HashMap<String, Arc<ServiceInfo>>>),

Exit(Sender<()>),
}
146 changes: 143 additions & 3 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
// corresponds to a set of DNS Resource Records.
#[cfg(feature = "logging")]
use crate::log::{debug, error, warn};
#[cfg(feature = "plugins")]
use crate::plugin::PluginCommand;
use crate::{
dns_cache::DnsCache,
dns_parser::{
Expand All @@ -46,6 +48,7 @@ use flume::{bounded, Sender, TrySendError};
use if_addrs::{IfAddr, Interface};
use polling::Poller;
use socket2::{SockAddr, Socket};
use std::sync::Arc;
use std::{
cmp::{self, Reverse},
collections::{BinaryHeap, HashMap, HashSet},
Expand Down Expand Up @@ -278,6 +281,21 @@ impl ServiceDaemon {
self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
}

/// Registers a plugin provided by the library consumer, to support dynamic mDNS resolution.
///
/// Please be aware that this resolution should be relatively consistent, e.g. configured
/// externally.
///
/// If feature `plugins` is enabled, the daemon will send requests to the plugins
/// using the flume channel sender for which needs to be provided as `pc_send`.
///
/// Please note that enabling the feature enables fetching the plugin-provided services
/// on *every* request, so this is disabled by default due to extra overhead.
#[cfg(feature = "plugins")]
pub fn register_plugin(&self, name: String, pc_send: Sender<PluginCommand>) -> Result<()> {
self.send_cmd(Command::RegisterPlugin(name, pc_send))
}

/// Registers a service provided by this host.
///
/// If `service_info` has no addresses yet and its `addr_auto` is enabled,
Expand Down Expand Up @@ -401,9 +419,29 @@ impl ServiceDaemon {
fn daemon_thread(signal_sock: UdpSocket, poller: Poller, receiver: Receiver<Command>) {
let zc = Zeroconf::new(signal_sock, poller);

#[cfg(feature = "plugins")]
let plugin_senders = zc.plugin_senders.clone();

if let Some(cmd) = Self::run(zc, receiver) {
match cmd {
Command::Exit(resp_s) => {
#[cfg(feature = "plugins")]
for (plugin, sender) in plugin_senders.clone() {
let (p_send, p_recv) = bounded(1);

match sender.send(PluginCommand::Exit(p_send)) {
Ok(()) => {}
Err(e) => {
error!("failed to send plugin exit command: {}, {}", plugin, e)
}
};

match p_recv.recv() {
Ok(()) => debug!("plugin {} exited successfully", plugin),
Err(e) => error!("plugin {} failed to exit: {}", plugin, e),
}
}

// It is guaranteed that the receiver already dropped,
// i.e. the daemon command channel closed.
if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
Expand Down Expand Up @@ -658,6 +696,11 @@ impl ServiceDaemon {
zc.monitors.push(resp_s);
}

#[cfg(feature = "plugins")]
Command::RegisterPlugin(name, papi_send) => {
zc.register_plugin(name, papi_send);
}

Command::SetOption(daemon_opt) => {
zc.process_set_option(daemon_opt);
}
Expand Down Expand Up @@ -930,6 +973,9 @@ struct Zeroconf {

/// Service instances that are pending for resolving SRV and TXT.
pending_resolves: HashSet<String>,

#[cfg(feature = "plugins")]
plugin_senders: HashMap<String, Sender<PluginCommand>>,
}

impl Zeroconf {
Expand Down Expand Up @@ -979,6 +1025,8 @@ impl Zeroconf {
timers,
status,
pending_resolves: HashSet::new(),
#[cfg(feature = "plugins")]
plugin_senders: HashMap::new(),
}
}

Expand Down Expand Up @@ -1875,12 +1923,26 @@ impl Zeroconf {
// See https://datatracker.ietf.org/doc/html/rfc6763#section-9
const META_QUERY: &str = "_services._dns-sd._udp.local.";

let services_by_plugins = self.list_plugin_services();

let mut all_services: HashMap<&String, &ServiceInfo> = HashMap::new();

for (k, v) in &self.my_services {
all_services.insert(k, v);
}

for (_plugin, services) in &services_by_plugins {
for (k, v) in services {
all_services.insert(k, v);
}
}

for question in msg.questions.iter() {
debug!("query question: {:?}", &question);
let qtype = question.entry.ty;

if qtype == TYPE_PTR {
for service in self.my_services.values() {
for service in all_services.values() {
if question.entry.name == service.get_type()
|| service
.get_subtype()
Expand All @@ -1906,7 +1968,7 @@ impl Zeroconf {
}
} else {
if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY {
for service in self.my_services.values() {
for service in all_services.values() {
if service.get_hostname().to_lowercase()
== question.entry.name.to_lowercase()
{
Expand Down Expand Up @@ -1940,7 +2002,7 @@ impl Zeroconf {
}

let name_to_find = question.entry.name.to_lowercase();
let service = match self.my_services.get(&name_to_find) {
let service = match all_services.get(&name_to_find) {
Some(s) => s,
None => continue,
};
Expand Down Expand Up @@ -2315,6 +2377,79 @@ impl Zeroconf {
self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
}

// Returns (Plugin, Map<Service, Info>)
#[cfg(feature = "plugins")]
fn list_plugin_services(&self) -> Vec<(String, HashMap<String, Arc<ServiceInfo>>)> {
let mut output = vec![];

for key in self.plugin_senders.keys() {
output.push((key.clone(), self.list_plugin_services_for(key)));
}

output
}

#[cfg(not(feature = "plugins"))]
fn list_plugin_services(&self) -> Vec<(String, HashMap<String, Arc<ServiceInfo>>)> {
vec![]
}

#[cfg(feature = "plugins")]
fn list_plugin_services_for(&self, plugin: &str) -> HashMap<String, Arc<ServiceInfo>> {
let (r_send, r_recv) = bounded(1);

let p_send = match self.plugin_senders.get(plugin) {
None => {
warn!("Could not find plugin {}", plugin);

return HashMap::new();
}
Some(p_send) => p_send,
};

match p_send.send(PluginCommand::ListServices(r_send)) {
Ok(()) => {}
Err(e) => warn!("Failed to send ListServices command: {}", e),
}

r_recv.recv().unwrap_or_else(|e| {
warn!("Could not receive service list: {}", e);

HashMap::new()
})
}

#[cfg(feature = "plugins")]
fn register_plugin(&mut self, name: String, papi_send: Sender<PluginCommand>) {
if self.plugin_senders.contains_key(&name) {
let old_send = self.plugin_senders.get(&name).unwrap();

let (exit_send, exit_recv) = bounded(1);

match old_send.send(PluginCommand::Exit(exit_send)) {
Ok(()) => debug!("Requested old plugin exit"),
Err(e) => warn!("Failed to send exit command to a plugin: {}", e),
}

match exit_recv.recv_timeout(Duration::from_secs(1)) {
Ok(()) => debug!("The old plugin exited"),
Err(e) => warn!("Old plugin's exit timed out: {}", e),
}
}

debug!("Registered a new plugin: {}", name);

self.plugin_senders.insert(name, papi_send.clone());

match papi_send.send(PluginCommand::Registered) {
Ok(()) => {}
Err(e) => warn!(
"Failed to send a registration notification to a plugin: {}",
e
),
};
}
}

/// All possible events sent to the client from the daemon
Expand Down Expand Up @@ -2410,6 +2545,9 @@ enum Command {

SetOption(DaemonOption),

#[cfg(feature = "plugins")]
RegisterPlugin(String, Sender<PluginCommand>),

Exit(Sender<DaemonStatus>),
}

Expand All @@ -2429,6 +2567,8 @@ impl fmt::Display for Command {
Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
Self::Unregister(_, _) => write!(f, "Command Unregister"),
Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
#[cfg(feature = "plugins")]
Self::RegisterPlugin(name, _) => write!(f, "Command RegisterPlugin: {}", name),
Self::Resolve(_, _) => write!(f, "Command Resolve"),
}
}
Expand Down
Loading