Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:

env:
BUILD_N0DES_API_SECRET: ${{ secrets.N0DES_API_SECRET }}
BUILD_DATUM_CONNECT_RELAY_URLS: ${{ vars.BUILD_DATUM_CONNECT_RELAY_URLS }}
DATUM_API_ENV: production

steps:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/manual-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:

env:
BUILD_N0DES_API_SECRET: ${{ secrets.N0DES_API_SECRET }}
BUILD_DATUM_CONNECT_RELAY_URLS: ${{ vars.BUILD_DATUM_CONNECT_RELAY_URLS }}
DATUM_API_ENV: production

steps:
Expand Down
272 changes: 268 additions & 4 deletions lib/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc};
use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};

use iroh::{
Endpoint, EndpointId, SecretKey, discovery::dns::DnsDiscovery, endpoint::default_relay_mode,
protocol::Router,
};
use iroh_base::RelayUrl;
use iroh_n0des::ApiSecret;
use iroh_proxy_utils::upstream::UpstreamMetrics;
use iroh_proxy_utils::{
Expand All @@ -14,8 +15,13 @@ use iroh_proxy_utils::{
upstream::{AuthError, AuthHandler, UpstreamProxy},
};
use iroh_relay::dns::{DnsProtocol, DnsResolver};
use iroh_relay::{RelayConfig, RelayMap};
use n0_error::{Result, StackResultExt, StdResultExt};
use tokio::{net::TcpListener, sync::futures::Notified, task::JoinHandle};
use tokio::{
net::TcpListener,
sync::futures::Notified,
task::{JoinHandle, JoinSet},
};
use tracing::{Instrument, debug, error_span, info, instrument, warn};

use crate::{ProxyState, Repo, StateWrapper, TcpProxyData, config::Config};
Expand Down Expand Up @@ -297,12 +303,15 @@ impl OutboundProxyHandle {
/// Build a new iroh endpoint, applying all relevant details from Configuration
/// to the base endpoint setup
pub(crate) async fn build_endpoint(secret_key: SecretKey, common: &Config) -> Result<Endpoint> {
let relay_mode = relay_mode_from_env_or_build().await?;
let mut builder = match common.discovery_mode {
crate::config::DiscoveryMode::Dns => {
Endpoint::empty_builder(default_relay_mode()).secret_key(secret_key)
Endpoint::empty_builder(relay_mode).secret_key(secret_key)
}
crate::config::DiscoveryMode::Default | crate::config::DiscoveryMode::Hybrid => {
Endpoint::builder().secret_key(secret_key)
Endpoint::builder()
.relay_mode(relay_mode)
.secret_key(secret_key)
}
};
if let Some(addr) = common.ipv4_addr {
Expand Down Expand Up @@ -334,6 +343,231 @@ pub(crate) async fn build_endpoint(secret_key: SecretKey, common: &Config) -> Re
Ok(endpoint)
}

const DATUM_CONNECT_RELAY_URLS: &str = "DATUM_CONNECT_RELAY_URLS";
const BUILD_DATUM_CONNECT_RELAY_URLS: &str = "BUILD_DATUM_CONNECT_RELAY_URLS";
const STARTUP_RELAY_SELECTION_MAX: usize = 5;
const STARTUP_RELAY_PROBE_TIMEOUT: Duration = Duration::from_millis(800);

async fn relay_mode_from_env_or_build() -> Result<iroh::endpoint::RelayMode> {
if let Ok(raw_urls) = std::env::var(DATUM_CONNECT_RELAY_URLS) {
match parse_relay_urls(&raw_urls) {
Ok(relays) => {
let relays =
select_best_relays_for_startup(relays, STARTUP_RELAY_SELECTION_MAX).await;
info!(
source = %DATUM_CONNECT_RELAY_URLS,
count = relays.len(),
"using custom iroh relay list from environment"
);
return Ok(iroh::endpoint::RelayMode::Custom(relays_to_map(relays)));
}
Err(err) => {
warn!("invalid relay urls in {DATUM_CONNECT_RELAY_URLS}: {err:#}");
}
}
}

if let Some(raw_urls) = option_env!("BUILD_DATUM_CONNECT_RELAY_URLS") {
match parse_relay_urls(raw_urls) {
Ok(relays) => {
let relays =
select_best_relays_for_startup(relays, STARTUP_RELAY_SELECTION_MAX).await;
info!(
source = %BUILD_DATUM_CONNECT_RELAY_URLS,
count = relays.len(),
"using custom iroh relay list from build environment"
);
return Ok(iroh::endpoint::RelayMode::Custom(relays_to_map(relays)));
}
Err(err) => {
warn!("invalid relay urls in {BUILD_DATUM_CONNECT_RELAY_URLS}: {err:#}");
}
}
}

Ok(default_relay_mode())
}

fn parse_relay_urls(raw: &str) -> Result<Vec<RelayUrl>> {
let relays: Vec<RelayUrl> = raw
.split(|c: char| c == ',' || c == ';' || c.is_ascii_whitespace())
.map(str::trim)
.filter(|s| !s.is_empty())
.map(normalize_relay_url)
.map(|url| RelayUrl::from_str(&url))
.collect::<std::result::Result<Vec<_>, _>>()
.std_context(
"Failed parsing relay URL list. Expected comma/space/newline separated URLs",
)?;

if relays.is_empty() {
n0_error::bail_any!("Relay URL list was provided but empty after parsing");
}

let mut deduped = Vec::with_capacity(relays.len());
for relay in relays {
if !deduped.iter().any(|seen: &RelayUrl| seen == &relay) {
deduped.push(relay);
}
}
Ok(deduped)
}

fn normalize_relay_url(raw: &str) -> String {
if raw.contains("://") {
raw.to_string()
} else {
format!("https://{raw}")
}
}

async fn select_best_relays_for_startup(relays: Vec<RelayUrl>, max_relays: usize) -> Vec<RelayUrl> {
let total_candidates = relays.len();
if relays.len() <= max_relays {
return relays;
}

let client = match reqwest::Client::builder()
.timeout(STARTUP_RELAY_PROBE_TIMEOUT)
.build()
{
Ok(client) => client,
Err(err) => {
warn!("relay probe setup failed, using first {max_relays} relays: {err:#}");
return relays.into_iter().take(max_relays).collect();
}
};

let mut joinset = JoinSet::new();
for relay in relays.iter().cloned() {
let client = client.clone();
joinset.spawn(async move {
let latency = probe_relay_latency(&client, &relay).await;
(relay, latency)
});
}

let mut successful = Vec::new();
let mut failed = Vec::new();
while let Some(joined) = joinset.join_next().await {
match joined {
Ok((relay, Ok(latency))) => successful.push((relay, latency)),
Ok((relay, Err(reason))) => failed.push((relay, reason)),
Err(err) => {
debug!("relay probe task join error: {err:#}");
}
}
}

successful.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.as_str().cmp(b.0.as_str())));
let mut selected: Vec<RelayUrl> = successful
.iter()
.take(max_relays)
.map(|(relay, _)| relay.clone())
.collect();

if selected.len() < max_relays {
failed.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
for (relay, _) in &failed {
if selected.len() == max_relays {
break;
}
if !selected.iter().any(|r| r == relay) {
selected.push(relay.clone());
}
}
}

if selected.len() < max_relays {
for relay in relays {
if selected.len() == max_relays {
break;
}
if !selected.iter().any(|r| r == &relay) {
selected.push(relay);
}
}
}

if !failed.is_empty() {
let failure_samples: Vec<String> = failed
.iter()
.take(5)
.map(|(relay, reason)| format!("{relay} -> {reason}"))
.collect();
warn!(
failed = failed.len(),
samples = ?failure_samples,
"relay ping probe failures observed"
);
}
info!(
total = total_candidates,
successful = successful.len(),
selected = selected.len(),
selected_relays = ?selected,
"selected startup relay shortlist"
);
selected
}

async fn probe_relay_latency(
client: &reqwest::Client,
relay: &RelayUrl,
) -> std::result::Result<Duration, String> {
let host = relay
.host_str()
.ok_or_else(|| "missing host in relay url".to_string())?
// RelayUrl canonicalizes with trailing dot, which can fail strict TLS hostname checks.
.trim_end_matches('.');
let mut https_url = reqwest::Url::parse(&format!("https://{host}/ping"))
.map_err(|err| format!("url parse: {err}"))?;
https_url.set_query(None);
debug!(
relay = %relay,
url = %https_url,
timeout_ms = STARTUP_RELAY_PROBE_TIMEOUT.as_millis(),
"starting relay ping probe"
);
let start = tokio::time::Instant::now();
match client.get(https_url.clone()).send().await {
Ok(resp) if resp.status().is_success() => {
let elapsed = start.elapsed();
debug!(
relay = %relay,
url = %https_url,
status = %resp.status(),
elapsed_ms = elapsed.as_millis(),
"relay ping probe succeeded"
);
Ok(elapsed)
}
Ok(resp) => {
debug!(
relay = %relay,
url = %https_url,
status = %resp.status(),
elapsed_ms = start.elapsed().as_millis(),
"relay ping probe got non-success response"
);
Err(format!("status {}", resp.status()))
}
Err(err) => {
debug!(
relay = %relay,
url = %https_url,
elapsed_ms = start.elapsed().as_millis(),
"relay ping probe request failed: {err:#}"
);
Err(format!("{err:#}"))
}
}
}

fn relays_to_map(relays: Vec<RelayUrl>) -> RelayMap {
RelayMap::from_iter(relays.into_iter().map(RelayConfig::from))
}

pub(crate) fn n0des_api_secret_from_env() -> Result<Option<ApiSecret>> {
let api_secret_str = match std::env::var("N0DES_API_SECRET") {
Ok(s) => s,
Expand Down Expand Up @@ -380,3 +614,33 @@ pub(crate) async fn build_n0des_client(
info!(remote=%remote_id.fmt_short(), "Connected to n0des endpoint for metrics collection");
Ok(Arc::new(client))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_relay_urls_accepts_bare_hostnames() {
let input = "iroh-relay.us-east-1.datumconnect.net,iroh-relay.us-west-1.datumconnect.net";
let parsed = parse_relay_urls(input).expect("should parse");
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0].scheme(), "https");
assert_eq!(
parsed[0].host_str(),
Some("iroh-relay.us-east-1.datumconnect.net.")
);
assert_eq!(
parsed[1].host_str(),
Some("iroh-relay.us-west-1.datumconnect.net.")
);
}

#[test]
fn parse_relay_urls_dedupes_and_skips_empty_tokens() {
let input = " relay-a.example.com, relay-a.example.com;;relay-b.example.com ";
let parsed = parse_relay_urls(input).expect("should parse");
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0].host_str(), Some("relay-a.example.com."));
assert_eq!(parsed[1].host_str(), Some("relay-b.example.com."));
}
}
Loading