diff --git a/easytier-contrib/easytier-uptime/src/health_checker.rs b/easytier-contrib/easytier-uptime/src/health_checker.rs index 13c0f91e..d1445df3 100644 --- a/easytier-contrib/easytier-uptime/src/health_checker.rs +++ b/easytier-contrib/easytier-uptime/src/health_checker.rs @@ -8,7 +8,8 @@ use anyhow::Context as _; use dashmap::DashMap; use easytier::{ common::config::{ - ConfigFileControl, ConfigLoader, NetworkIdentity, PeerConfig, TomlConfigLoader, + ConfigFileControl, ConfigLoader, DEFAULT_CONNECTION_PRIORITY, NetworkIdentity, PeerConfig, + TomlConfigLoader, }, instance_manager::NetworkInstanceManager, }; @@ -360,6 +361,7 @@ impl HealthChecker { .parse() .with_context(|| "failed to parse peer uri")?, peer_public_key: None, + priority: DEFAULT_CONNECTION_PRIORITY, }]); let inst_id = inst_id.unwrap_or(uuid::Uuid::new_v4()); diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 2f02b8c8..aabcdc44 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -28,6 +28,55 @@ use super::env_parser; pub type Flags = crate::proto::common::FlagsInConfig; +pub const DEFAULT_CONNECTION_PRIORITY: u32 = 0; + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(untagged)] +enum ListenerConfigDef { + Url(url::Url), + Config { + url: url::Url, + #[serde(default)] + priority: u32, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[serde(from = "ListenerConfigDef")] +pub struct ListenerConfig { + pub url: url::Url, + pub priority: u32, +} + +impl ListenerConfig { + pub fn new(url: url::Url, priority: u32) -> Self { + Self { url, priority } + } + + pub fn with_default_priority(url: url::Url) -> Self { + Self::new(url, DEFAULT_CONNECTION_PRIORITY) + } +} + +impl From for ListenerConfig { + fn from(url: url::Url) -> Self { + Self::with_default_priority(url) + } +} + +impl From for ListenerConfig { + fn from(def: ListenerConfigDef) -> Self { + match def { + ListenerConfigDef::Url(url) => Self::with_default_priority(url), + ListenerConfigDef::Config { url, priority } => Self::new(url, priority), + } + } +} + +fn listener_config_urls(listeners: Vec) -> Vec { + listeners.into_iter().map(|listener| listener.url).collect() +} + pub fn gen_default_flags() -> Flags { #[allow(deprecated)] Flags { @@ -196,6 +245,7 @@ pub trait ConfigLoader: Send + Sync { fn set_network_identity(&self, identity: NetworkIdentity); fn get_listener_uris(&self) -> Vec; + fn get_listener_configs(&self) -> Vec; fn get_peers(&self) -> Vec; fn set_peers(&self, peers: Vec); @@ -205,6 +255,8 @@ pub trait ConfigLoader: Send + Sync { fn get_mapped_listeners(&self) -> Vec; fn set_mapped_listeners(&self, listeners: Option>); + fn get_mapped_listener_configs(&self) -> Vec; + fn set_mapped_listener_configs(&self, listeners: Option>); fn get_vpn_portal_config(&self) -> Option; fn set_vpn_portal_config(&self, config: VpnPortalConfig); @@ -403,6 +455,8 @@ impl Default for NetworkIdentity { pub struct PeerConfig { pub uri: url::Url, pub peer_public_key: Option, + #[serde(default)] + pub priority: u32, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -534,8 +588,8 @@ struct Config { ipv6_public_addr_prefix: Option, dhcp: Option, network_identity: Option, - listeners: Option>, - mapped_listeners: Option>, + listeners: Option>, + mapped_listeners: Option>, exit_nodes: Option>, peer: Option>, @@ -849,6 +903,10 @@ impl ConfigLoader for TomlConfigLoader { } fn get_listener_uris(&self) -> Vec { + listener_config_urls(self.get_listener_configs()) + } + + fn get_listener_configs(&self) -> Vec { self.config .lock() .unwrap() @@ -866,14 +924,29 @@ impl ConfigLoader for TomlConfigLoader { } fn get_listeners(&self) -> Option> { - self.config.lock().unwrap().listeners.clone() + self.config + .lock() + .unwrap() + .listeners + .clone() + .map(listener_config_urls) } fn set_listeners(&self, listeners: Vec) { - self.config.lock().unwrap().listeners = Some(listeners); + self.config.lock().unwrap().listeners = + Some(listeners.into_iter().map(Into::into).collect()); } fn get_mapped_listeners(&self) -> Vec { + listener_config_urls(self.get_mapped_listener_configs()) + } + + fn set_mapped_listeners(&self, listeners: Option>) { + self.config.lock().unwrap().mapped_listeners = + listeners.map(|listeners| listeners.into_iter().map(Into::into).collect()); + } + + fn get_mapped_listener_configs(&self) -> Vec { self.config .lock() .unwrap() @@ -882,7 +955,7 @@ impl ConfigLoader for TomlConfigLoader { .unwrap_or_default() } - fn set_mapped_listeners(&self, listeners: Option>) { + fn set_mapped_listener_configs(&self, listeners: Option>) { self.config.lock().unwrap().mapped_listeners = listeners; } @@ -1402,6 +1475,60 @@ members = ["admin"] assert_eq!(group.members, vec!["admin"]); } + #[test] + fn test_listener_priority_config_supports_old_and_structured_values() { + let config = TomlConfigLoader::new_from_str( + r#" +listeners = [ + "tcp://0.0.0.0:11010", + { url = "udp://0.0.0.0:11010", priority = 80 }, +] +mapped_listeners = [ + "tcp://example.com:11010", + { url = "tcp://frps.example.com:30001", priority = 100 }, +] + +[[peer]] +uri = "tcp://proxy.example.com:443" +priority = 100 + +[[peer]] +uri = "tcp://normal.example.com:11010" +"#, + ) + .unwrap(); + + let listeners = config.get_listener_configs(); + assert_eq!(listeners[0].url.to_string(), "tcp://0.0.0.0:11010"); + assert_eq!(listeners[0].priority, DEFAULT_CONNECTION_PRIORITY); + assert_eq!(listeners[1].url.to_string(), "udp://0.0.0.0:11010"); + assert_eq!(listeners[1].priority, 80); + + let mapped_listeners = config.get_mapped_listener_configs(); + assert_eq!( + mapped_listeners[0].url.to_string(), + "tcp://example.com:11010" + ); + assert_eq!(mapped_listeners[0].priority, DEFAULT_CONNECTION_PRIORITY); + assert_eq!( + mapped_listeners[1].url.to_string(), + "tcp://frps.example.com:30001" + ); + assert_eq!(mapped_listeners[1].priority, 100); + + let peers = config.get_peers(); + assert_eq!(peers[0].uri.to_string(), "tcp://proxy.example.com:443"); + assert_eq!(peers[0].priority, 100); + assert_eq!(peers[1].uri.to_string(), "tcp://normal.example.com:11010"); + assert_eq!(peers[1].priority, DEFAULT_CONNECTION_PRIORITY); + + let dumped = config.dump(); + let reloaded = TomlConfigLoader::new_from_str(&dumped).unwrap(); + assert_eq!(reloaded.get_listener_configs(), listeners); + assert_eq!(reloaded.get_mapped_listener_configs(), mapped_listeners); + assert_eq!(reloaded.get_peers(), peers); + } + #[test] fn test_network_config_source_user_is_implicit() { let config = TomlConfigLoader::default(); diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 48645deb..5e426caf 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -11,7 +11,7 @@ use dashmap::DashMap; use super::{ PeerId, - config::{ConfigLoader, Flags}, + config::{ConfigLoader, DEFAULT_CONNECTION_PRIORITY, Flags, ListenerConfig}, netns::NetNS, network::IPCollector, stun::{StunInfoCollector, StunInfoCollectorTrait}, @@ -212,7 +212,7 @@ pub struct GlobalCtx { stun_info_collection: Mutex>, - running_listeners: Mutex>, + running_listeners: Mutex>, advertised_ipv6_public_addr_prefix: Mutex>, flags: ArcSwap, @@ -509,13 +509,28 @@ impl GlobalCtx { } pub fn get_running_listeners(&self) -> Vec { + self.running_listeners + .lock() + .unwrap() + .iter() + .map(|listener| listener.url.clone()) + .collect() + } + + pub fn get_running_listener_configs(&self) -> Vec { self.running_listeners.lock().unwrap().clone() } pub fn add_running_listener(&self, url: url::Url) { + self.add_running_listener_with_priority(url, DEFAULT_CONNECTION_PRIORITY); + } + + pub fn add_running_listener_with_priority(&self, url: url::Url, priority: u32) { let mut l = self.running_listeners.lock().unwrap(); - if !l.contains(&url) { - l.push(url); + if let Some(listener) = l.iter_mut().find(|listener| listener.url == url) { + listener.priority = priority; + } else { + l.push(ListenerConfig::new(url, priority)); } } @@ -744,11 +759,9 @@ impl GlobalCtx { } fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool { - self.running_listeners - .lock() - .unwrap() - .iter() - .any(|x| x.port() == Some(port) && matches_protocol!(x, Protocol::UDP) == is_udp) + self.running_listeners.lock().unwrap().iter().any(|x| { + x.url.port() == Some(port) && matches_protocol!(&x.url, Protocol::UDP) == is_udp + }) } #[tracing::instrument(ret, skip(self))] diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index e6296f11..2b54d21a 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -13,8 +13,8 @@ use std::{ use crate::{ common::{ - PeerId, dns::socket_addrs, error::Error, global_ctx::ArcGlobalCtx, - stun::StunInfoCollectorTrait, + PeerId, config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, error::Error, + global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait, }, connector::udp_hole_punch::handle_rpc_result, peers::{ @@ -31,7 +31,7 @@ use crate::{ }, rpc_types::controller::BaseController, }, - tunnel::{IpVersion, matches_protocol, udp::UdpTunnelConnector}, + tunnel::{IpVersion, PrioritizedConnector, matches_protocol, udp::UdpTunnelConnector}, use_global_var, }; @@ -48,6 +48,7 @@ use url::Host; pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1; pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300; +const DIRECT_CONNECTOR_LOW_PRIORITY_RETRY_TIMEOUT_SEC: u64 = 300; static TESTING: AtomicBool = AtomicBool::new(false); @@ -131,11 +132,70 @@ struct DstBlackListItem(PeerId, String); #[derive(Hash, Eq, PartialEq, Clone)] struct DstListenerUrlBlackListItem(PeerId, String); +#[derive(Clone, Debug)] +struct AvailableListener { + url: url::Url, + priority: u32, +} + +fn available_listeners_from_ip_list( + ip_list: &GetIpListResponse, + enable_ipv6: bool, +) -> Vec { + let candidate_listeners: Vec = if ip_list.listener_infos.is_empty() { + ip_list + .listeners + .iter() + .map(|url| AvailableListener { + url: url.clone().into(), + priority: DEFAULT_CONNECTION_PRIORITY, + }) + .collect() + } else { + ip_list + .listener_infos + .iter() + .filter_map(|info| { + info.url.as_ref().map(|url| AvailableListener { + url: url.clone().into(), + priority: info.priority, + }) + }) + .collect() + }; + + candidate_listeners + .into_iter() + .filter(|l| l.url.scheme() != "ring") + .filter(|l| { + mapped_listener_port(&l.url).is_some() + && l.url + .host() + .is_some_and(|host| enable_ipv6 || !matches!(host, Host::Ipv6(_))) + }) + .collect() +} + +fn sort_available_listeners(available_listeners: &mut [AvailableListener], default_protocol: &str) { + available_listeners.sort_by_key(|l| { + let scheme = l.url.scheme(); + let protocol_priority = if scheme == default_protocol { + 3 + } else if scheme == "udp" { + 2 + } else { + 1 + }; + (std::cmp::Reverse(l.priority), protocol_priority) + }); +} + struct DirectConnectorManagerData { global_ctx: ArcGlobalCtx, peer_manager: Arc, dst_listener_blacklist: timedmap::TimedMap, peer_black_list: timedmap::TimedMap, + low_priority_direct_retry_backoff: timedmap::TimedMap, } impl DirectConnectorManagerData { @@ -145,6 +205,7 @@ impl DirectConnectorManagerData { peer_manager, dst_listener_blacklist: timedmap::TimedMap::new(), peer_black_list: timedmap::TimedMap::new(), + low_priority_direct_retry_backoff: timedmap::TimedMap::new(), } } @@ -201,6 +262,7 @@ impl DirectConnectorManagerData { &self, dst_peer_id: PeerId, remote_url: &url::Url, + priority: u32, ) -> Result<(PeerId, PeerConnId), Error> { let local_socket = Arc::new( UdpSocket::bind("[::]:0") @@ -239,7 +301,12 @@ impl DirectConnectorManagerData { // NOTICE: must add as directly connected tunnel self.peer_manager - .add_client_tunnel_with_peer_id_hint(ret, true, Some(dst_peer_id)) + .add_client_tunnel_with_peer_id_hint_and_priority( + ret, + true, + Some(dst_peer_id), + priority, + ) .await } @@ -247,6 +314,7 @@ impl DirectConnectorManagerData { &self, dst_peer_id: PeerId, remote_url: &url::Url, + priority: u32, ) -> Result<(PeerId, PeerConnId), Error> { let local_socket = { let _g = self.global_ctx.net_ns.guard(); @@ -275,21 +343,34 @@ impl DirectConnectorManagerData { .await?; self.peer_manager - .add_client_tunnel_with_peer_id_hint(ret, true, Some(dst_peer_id)) + .add_client_tunnel_with_peer_id_hint_and_priority( + ret, + true, + Some(dst_peer_id), + priority, + ) .await } - async fn do_try_connect_to_ip(&self, dst_peer_id: PeerId, addr: String) -> Result<(), Error> { + async fn do_try_connect_to_ip( + &self, + dst_peer_id: PeerId, + addr: String, + priority: u32, + ) -> Result<(), Error> { let connector = create_connector_by_url(&addr, &self.global_ctx, IpVersion::Both).await?; let remote_url = connector.remote_url(); let (peer_id, conn_id) = if matches_scheme!(remote_url, TunnelScheme::Ip(IpScheme::Udp)) { match remote_url.host() { Some(Host::Ipv6(_)) => { - self.connect_to_public_ipv6(dst_peer_id, &remote_url) + self.connect_to_public_ipv6(dst_peer_id, &remote_url, priority) .await? } Some(Host::Ipv4(ip)) if is_public_ipv4(ip) => { - match self.connect_to_public_ipv4(dst_peer_id, &remote_url).await { + match self + .connect_to_public_ipv4(dst_peer_id, &remote_url, priority) + .await + { Ok(ret) => ret, Err(err) => { tracing::debug!( @@ -300,7 +381,7 @@ impl DirectConnectorManagerData { timeout( std::time::Duration::from_secs(3), self.peer_manager.try_direct_connect_with_peer_id_hint( - connector, + PrioritizedConnector::new(connector, priority), Some(dst_peer_id), ), ) @@ -311,8 +392,10 @@ impl DirectConnectorManagerData { _ => { timeout( std::time::Duration::from_secs(3), - self.peer_manager - .try_direct_connect_with_peer_id_hint(connector, Some(dst_peer_id)), + self.peer_manager.try_direct_connect_with_peer_id_hint( + PrioritizedConnector::new(connector, priority), + Some(dst_peer_id), + ), ) .await?? } @@ -320,8 +403,10 @@ impl DirectConnectorManagerData { } else { timeout( std::time::Duration::from_secs(3), - self.peer_manager - .try_direct_connect_with_peer_id_hint(connector, Some(dst_peer_id)), + self.peer_manager.try_direct_connect_with_peer_id_hint( + PrioritizedConnector::new(connector, priority), + Some(dst_peer_id), + ), ) .await?? }; @@ -345,6 +430,7 @@ impl DirectConnectorManagerData { self: Arc, dst_peer_id: PeerId, addr: String, + priority: u32, ) -> Result<(), Error> { let mut rand_gen = rand::rngs::OsRng; let backoff_ms = [1000, 2000, 4000]; @@ -361,19 +447,26 @@ impl DirectConnectorManagerData { return Err(Error::UrlInBlacklist); } + let has_good_direct_conn = || { + self.peer_manager + .has_directly_connected_conn_with_priority_at_most(dst_peer_id, priority) + }; + loop { - if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + if has_good_direct_conn() { return Ok(()); } tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start one round"); - let ret = self.do_try_connect_to_ip(dst_peer_id, addr.clone()).await; + let ret = self + .do_try_connect_to_ip(dst_peer_id, addr.clone(), priority) + .await; tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return"); if ret.is_ok() { return Ok(()); } - if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + if has_good_direct_conn() { return Ok(()); } @@ -404,17 +497,19 @@ impl DirectConnectorManagerData { self: &Arc, dst_peer_id: PeerId, ip_list: &GetIpListResponse, - listener: &url::Url, + listener: &AvailableListener, tasks: &mut JoinSet>, ) { - let Ok(mut addrs) = resolve_mapped_listener_addrs(listener).await else { + let Ok(mut addrs) = resolve_mapped_listener_addrs(&listener.url).await else { tracing::error!(?listener, "failed to parse socket address from listener"); return; }; let listener_host = addrs.pop(); tracing::info!(?listener_host, ?listener, "try direct connect to peer"); - let is_udp = matches_protocol!(listener, Protocol::UDP); + let is_udp = matches_protocol!(&listener.url, Protocol::UDP); + let listener_url = &listener.url; + let priority = listener.priority; // Snapshot running listeners once; used for cheap port pre-checks before the // expensive should_deny_proxy call (which binds a socket per IP) in the // unspecified-address expansion loops below. @@ -449,12 +544,13 @@ impl DirectConnectorManagerData { ); return; } - let mut addr = (*listener).clone(); + let mut addr = listener_url.clone(); if addr.set_host(Some(ip.to_string().as_str())).is_ok() { tasks.spawn(Self::try_connect_to_ip( self.clone(), dst_peer_id, addr.to_string(), + priority, )); } else { tracing::error!( @@ -475,7 +571,8 @@ impl DirectConnectorManagerData { tasks.spawn(Self::try_connect_to_ip( self.clone(), dst_peer_id, - listener.to_string(), + listener_url.to_string(), + priority, )); } } @@ -504,12 +601,13 @@ impl DirectConnectorManagerData { ); return; } - let mut addr = (*listener).clone(); + let mut addr = listener_url.clone(); if addr.set_host(Some(format!("[{}]", ip).as_str())).is_ok() { tasks.spawn(Self::try_connect_to_ip( self.clone(), dst_peer_id, addr.to_string(), + priority, )); } else { tracing::error!( @@ -535,7 +633,8 @@ impl DirectConnectorManagerData { tasks.spawn(Self::try_connect_to_ip( self.clone(), dst_peer_id, - listener.to_string(), + listener_url.to_string(), + priority, )); } } @@ -553,15 +652,7 @@ impl DirectConnectorManagerData { ip_list: GetIpListResponse, ) -> Result<(), Error> { let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6; - let available_listeners = ip_list - .listeners - .clone() - .into_iter() - .map(Into::::into) - .filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None }) - .filter(|l| mapped_listener_port(l).is_some() && l.host().is_some()) - .filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_))) - .collect::>(); + let mut available_listeners = available_listeners_from_ip_list(&ip_list, enable_ipv6); tracing::debug!(?available_listeners, "got available listeners"); @@ -570,35 +661,30 @@ impl DirectConnectorManagerData { } let default_protocol = self.global_ctx.get_flags().default_protocol; - // sort available listeners, default protocol has the highest priority, udp is second, others just random - // highest priority is in the last - let mut available_listeners = available_listeners; - available_listeners.sort_by_key(|l| { - let scheme = l.scheme(); - if scheme == default_protocol { - 3 - } else if scheme == "udp" { - 2 - } else { - 1 - } - }); + // Sort by configured priority first (lower is better), then prefer the + // default protocol and UDP. The best candidate group is in the last slot. + sort_available_listeners(&mut available_listeners, &default_protocol); - while !available_listeners.is_empty() { + while let Some(cur_listener) = available_listeners.last() { let mut tasks = JoinSet::new(); let mut listener_list = vec![]; - let cur_scheme = available_listeners.last().unwrap().scheme().to_owned(); + let cur_priority = cur_listener.priority; + let cur_scheme = cur_listener.url.scheme().to_owned(); while let Some(listener) = available_listeners.last() { - if listener.scheme() != cur_scheme { + if listener.priority != cur_priority || listener.url.scheme() != cur_scheme { break; } - tracing::debug!("try direct connect to peer with listener: {}", listener); + tracing::debug!( + %cur_priority, + "try direct connect to peer with listener: {}", + listener.url + ); self.spawn_direct_connect_task(dst_peer_id, &ip_list, listener, &mut tasks) .await; - listener_list.push(listener.clone().to_string()); + listener_list.push(listener.url.to_string()); available_listeners.pop(); } @@ -606,12 +692,16 @@ impl DirectConnectorManagerData { tracing::debug!( ?ret, ?dst_peer_id, + ?cur_priority, ?cur_scheme, ?listener_list, "all tasks finished for current scheme" ); - if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + if self + .peer_manager + .has_directly_connected_conn_with_priority_at_most(dst_peer_id, cur_priority) + { tracing::info!( "direct connect to peer {} success, has direct conn", dst_peer_id @@ -666,13 +756,29 @@ impl DirectConnectorManagerData { .await; tracing::info!(?ret, ?dst_peer_id, "do_try_direct_connect return"); - if peer_manager.has_directly_connected_conn(dst_peer_id) { + if peer_manager.has_directly_connected_conn_with_priority_at_most( + dst_peer_id, + DEFAULT_CONNECTION_PRIORITY, + ) { tracing::info!( "direct connect to peer {} success, has direct conn", dst_peer_id ); return Ok(()); } + + if peer_manager.has_directly_connected_conn(dst_peer_id) { + self.low_priority_direct_retry_backoff.insert( + dst_peer_id, + (), + Duration::from_secs(DIRECT_CONNECTOR_LOW_PRIORITY_RETRY_TIMEOUT_SEC), + ); + tracing::info!( + "direct connect to peer {} skipped temporarily, only low-priority direct conn exists", + dst_peer_id + ); + return Ok(()); + } } } } @@ -715,6 +821,7 @@ impl PeerTaskLauncher for DirectConnectorLauncher { async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec { data.peer_black_list.cleanup(); + data.low_priority_direct_retry_backoff.cleanup(); let my_peer_id = data.peer_manager.my_peer_id(); data.peer_manager .list_peers() @@ -722,7 +829,13 @@ impl PeerTaskLauncher for DirectConnectorLauncher { .into_iter() .filter(|peer_id| { *peer_id != my_peer_id - && !data.peer_manager.has_directly_connected_conn(*peer_id) + && !data + .peer_manager + .has_directly_connected_conn_with_priority_at_most( + *peer_id, + DEFAULT_CONNECTION_PRIORITY, + ) + && !data.low_priority_direct_retry_backoff.contains(peer_id) && !data.peer_black_list.contains(peer_id) }) .collect() @@ -813,12 +926,16 @@ mod tests { wait_route_appear_with_cost, }, proto::peer_rpc::GetIpListResponse, + proto::peer_rpc::ListenerInfo, tunnel::{IpScheme, TunnelScheme, matches_scheme}, }; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use super::{TESTING, mapped_listener_port, resolve_mapped_listener_addrs}; + use super::{ + DEFAULT_CONNECTION_PRIORITY, TESTING, available_listeners_from_ip_list, + mapped_listener_port, resolve_mapped_listener_addrs, sort_available_listeners, + }; #[tokio::test] async fn public_ipv6_candidate_rejects_easytier_managed_addr_even_in_tests() { @@ -868,6 +985,68 @@ mod tests { ); } + #[test] + fn available_listener_order_uses_priority_before_protocol() { + let ip_list = GetIpListResponse { + listener_infos: vec![ + ListenerInfo { + url: Some("tcp://127.0.0.1:11010".parse().unwrap()), + priority: 100, + }, + ListenerInfo { + url: Some("udp://127.0.0.1:11011".parse().unwrap()), + priority: 0, + }, + ListenerInfo { + url: Some("tcp://127.0.0.1:11012".parse().unwrap()), + priority: 0, + }, + ], + ..Default::default() + }; + + let mut listeners = available_listeners_from_ip_list(&ip_list, true); + sort_available_listeners(&mut listeners, "tcp"); + + let ordered_urls = listeners + .iter() + .rev() + .map(|listener| listener.url.to_string()) + .collect::>(); + assert_eq!( + ordered_urls, + vec![ + "tcp://127.0.0.1:11012", + "udp://127.0.0.1:11011", + "tcp://127.0.0.1:11010", + ] + ); + } + + #[test] + fn available_listener_order_keeps_legacy_listeners_at_default_priority() { + let mut ip_list = GetIpListResponse::default(); + ip_list + .listeners + .push("udp://127.0.0.1:11010".parse().unwrap()); + ip_list + .listeners + .push("tcp://127.0.0.1:11011".parse().unwrap()); + + let mut listeners = available_listeners_from_ip_list(&ip_list, true); + sort_available_listeners(&mut listeners, "tcp"); + + assert!( + listeners + .iter() + .all(|listener| listener.priority == DEFAULT_CONNECTION_PRIORITY) + ); + assert_eq!( + listeners.last().unwrap().url.to_string(), + "tcp://127.0.0.1:11011" + ); + } + #[tokio::test] async fn resolve_mapped_listener_addrs_uses_default_ports() { let wss_addrs = resolve_mapped_listener_addrs(&"wss://127.0.0.1".parse().unwrap()) diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 8e9fd902..3bad3627 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -3,11 +3,13 @@ use std::{ sync::{Arc, Weak}, }; -use dashmap::DashSet; +use dashmap::{DashMap, DashSet}; use tokio::{sync::mpsc, task::JoinSet, time::timeout}; use crate::{ - common::{PeerId, dns::socket_addrs, join_joinset_background}, + common::{ + PeerId, config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, join_joinset_background, + }, peers::peer_conn::PeerConnId, proto::{ api::instance::{ @@ -16,7 +18,7 @@ use crate::{ }, rpc_types::{self, controller::BaseController}, }, - tunnel::{IpVersion, TunnelConnector}, + tunnel::{IpVersion, PrioritizedConnector, TunnelConnector}, utils::weak_upgrade, }; @@ -32,7 +34,7 @@ use crate::{ use super::create_connector_by_url; -type ConnectorMap = Arc>; +type ConnectorMap = Arc>; #[derive(Debug, Clone)] struct ReconnResult { @@ -43,7 +45,7 @@ struct ReconnResult { struct ConnectorManagerData { connectors: ConnectorMap, - reconnecting: DashSet, + reconnecting: DashMap, peer_manager: Weak, alive_conn_urls: Arc>, // user removed connector urls @@ -60,14 +62,14 @@ pub struct ManualConnectorManager { impl ManualConnectorManager { pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { - let connectors = Arc::new(DashSet::new()); + let connectors = Arc::new(DashMap::new()); let tasks = JoinSet::new(); let mut ret = Self { global_ctx: global_ctx.clone(), data: Arc::new(ConnectorManagerData { connectors, - reconnecting: DashSet::new(), + reconnecting: DashMap::new(), peer_manager: Arc::downgrade(&peer_manager), alive_conn_urls: Arc::new(DashSet::new()), removed_conn_urls: Arc::new(DashSet::new()), @@ -85,14 +87,26 @@ impl ManualConnectorManager { pub fn add_connector(&self, connector: T) where - T: TunnelConnector + 'static, + T: TunnelConnector, { tracing::info!("add_connector: {}", connector.remote_url()); - self.data.connectors.insert(connector.remote_url()); + let priority = connector.priority(); + self.data + .connectors + .insert(connector.remote_url(), priority); } pub async fn add_connector_by_url(&self, url: url::Url) -> Result<(), Error> { - self.data.connectors.insert(url); + self.add_connector_by_url_with_priority(url, DEFAULT_CONNECTION_PRIORITY) + .await + } + + pub async fn add_connector_by_url_with_priority( + &self, + url: url::Url, + priority: u32, + ) -> Result<(), Error> { + self.data.connectors.insert(url, priority); Ok(()) } @@ -138,19 +152,25 @@ impl ManualConnectorManager { Connector { url: Some(conn_url.into()), status: status.into(), + priority: *item.value(), }, ); } - let reconnecting_urls: BTreeSet = - self.data.reconnecting.iter().map(|x| x.clone()).collect(); + let reconnecting_urls: BTreeSet<_> = self + .data + .reconnecting + .iter() + .map(|item| (item.key().clone(), *item.value())) + .collect(); - for conn_url in reconnecting_urls { + for (conn_url, priority) in reconnecting_urls { ret.insert( 0, Connector { url: Some(conn_url.into()), status: ConnectorStatus::Connecting.into(), + priority, }, ); } @@ -177,16 +197,20 @@ impl ManualConnectorManager { for dead_url in dead_urls { let data_clone = data.clone(); let sender = reconn_result_send.clone(); - data.connectors.remove(&dead_url).unwrap(); - let insert_succ = data.reconnecting.insert(dead_url.clone()); - assert!(insert_succ); + let priority = data + .connectors + .remove(&dead_url) + .map(|(_, priority)| priority) + .unwrap_or(DEFAULT_CONNECTION_PRIORITY); + let previous = data.reconnecting.insert(dead_url.clone(), priority); + assert!(previous.is_none()); tasks.lock().unwrap().spawn(async move { - let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone() ).await; + let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), priority).await; let _ = sender.send(reconn_ret).await; data_clone.reconnecting.remove(&dead_url).unwrap(); - data_clone.connectors.insert(dead_url.clone()); + data_clone.connectors.insert(dead_url.clone(), priority); }); } tracing::info!("reconn_interval tick, done"); @@ -206,7 +230,7 @@ impl ManualConnectorManager { if data.connectors.remove(url).is_some() { tracing::warn!("connector: {}, removed", url); continue; - } else if data.reconnecting.contains(url) { + } else if data.reconnecting.contains_key(url) { tracing::warn!("connector: {}, reconnecting, remove later.", url); remove_later.insert(url.clone()); continue; @@ -244,6 +268,7 @@ impl ManualConnectorManager { data: Arc, dead_url: String, ip_version: IpVersion, + priority: u32, ) -> Result { let connector = create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?; @@ -257,7 +282,9 @@ impl ManualConnectorManager { ))); }; - let (peer_id, conn_id) = pm.try_direct_connect(connector).await?; + let (peer_id, conn_id) = pm + .try_direct_connect(PrioritizedConnector::new(connector, priority)) + .await?; tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); Ok(ReconnResult { dead_url, @@ -269,6 +296,7 @@ impl ManualConnectorManager { async fn conn_reconnect( data: Arc, dead_url: url::Url, + priority: u32, ) -> Result { tracing::info!("reconnect: {}", dead_url); @@ -326,6 +354,7 @@ impl ManualConnectorManager { data.clone(), dead_url.to_string(), ip_version, + priority, ), ) .await; diff --git a/easytier/src/connector/tcp_hole_punch.rs b/easytier/src/connector/tcp_hole_punch.rs index 9baaeaa0..81fe41ae 100644 --- a/easytier/src/connector/tcp_hole_punch.rs +++ b/easytier/src/connector/tcp_hole_punch.rs @@ -472,7 +472,10 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher { continue; } - if data.peer_mgr.get_peer_map().has_peer(peer_id) { + if data.peer_mgr.has_conn_with_priority_at_most( + peer_id, + crate::common::config::DEFAULT_CONNECTION_PRIORITY, + ) { tracing::trace!(peer_id, "tcp hole punch task collect skip already has peer"); continue; } diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index e1c2e14b..c043163f 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -474,7 +474,10 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher { continue; } - if data.peer_mgr.get_peer_map().has_peer(peer_id) { + if data.peer_mgr.has_conn_with_priority_at_most( + peer_id, + crate::common::config::DEFAULT_CONNECTION_PRIORITY, + ) { continue; } diff --git a/easytier/src/core.rs b/easytier/src/core.rs index d29d7708..7ec938b4 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -923,6 +923,7 @@ impl NetworkOptions { .parse() .with_context(|| format!("failed to parse peer uri: {}", p))?, peer_public_key: None, + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }); } cfg.set_peers(peers); @@ -960,6 +961,7 @@ impl NetworkOptions { format!("failed to parse external node uri: {}", external_nodes) })?, peer_public_key: None, + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }); cfg.set_peers(old_peers); } diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 6d60d14b..2f80e5fb 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -43,7 +43,7 @@ use easytier::{ }, instance::{ AclManageRpc, AclManageRpcClientFactory, Connector, ConnectorManageRpc, - ConnectorManageRpcClientFactory, CredentialManageRpc, + ConnectorManageRpcClientFactory, ConnectorStatus, CredentialManageRpc, CredentialManageRpcClientFactory, DumpRouteRequest, ForeignNetworkEntryPb, GenerateCredentialRequest, GetAclStatsRequest, GetPrometheusStatsRequest, GetStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, @@ -236,6 +236,8 @@ enum ConnectorSubCommand { Add { #[arg(help = "connector url, e.g., tcp://1.2.3.4:11010")] url: String, + #[arg(short = 'p', long = "priority", default_value_t = easytier::common::config::DEFAULT_CONNECTION_PRIORITY, help = "connection priority; lower values are preferred")] + priority: u32, }, /// Remove a connector Remove { @@ -254,7 +256,11 @@ struct MappedListenerArgs { #[derive(Subcommand, Debug)] enum MappedListenerSubCommand { /// Add Mapped Listerner - Add { url: String }, + Add { + url: String, + #[arg(short = 'p', long = "priority", default_value_t = easytier::common::config::DEFAULT_CONNECTION_PRIORITY, help = "listener priority; lower values are preferred")] + priority: u32, + }, /// Remove Mapped Listener Remove { url: String }, /// List Existing Mapped Listener @@ -1247,6 +1253,7 @@ impl<'a> CommandHandler<'a> { &self, url: &str, action: ConfigPatchAction, + priority: Option, ) -> Result<(), Error> { let url = match action { ConfigPatchAction::Add => Self::connector_validate_url(url)?, @@ -1267,6 +1274,7 @@ impl<'a> CommandHandler<'a> { connectors: vec![UrlPatch { action: action.into(), url: Some(url.into()), + priority, }], ..Default::default() }), @@ -1281,11 +1289,12 @@ impl<'a> CommandHandler<'a> { &self, url: &str, action: ConfigPatchAction, + priority: Option, ) -> Result<(), Error> { let url = url.to_string(); self.apply_to_instances(|handler| { let url = url.clone(); - Box::pin(async move { handler.apply_connector_modify(&url, action).await }) + Box::pin(async move { handler.apply_connector_modify(&url, action, priority).await }) }) .await } @@ -1309,6 +1318,8 @@ impl<'a> CommandHandler<'a> { tx_bytes: String, #[tabled(rename = "tunnel")] tunnel_proto: String, + #[tabled(rename = "prio")] + priority: String, #[tabled(rename = "NAT")] nat_type: String, #[tabled(skip)] @@ -1338,6 +1349,10 @@ impl<'a> CommandHandler<'a> { rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL), tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL), tunnel_proto: p.get_conn_protos().unwrap_or_default().join(","), + priority: p + .get_conn_priority() + .map(|priority| priority.to_string()) + .unwrap_or_else(|| "-".to_string()), nat_type: p.get_udp_nat_type(), id: route.peer_id.to_string(), version: if route.version.is_empty() { @@ -1363,6 +1378,7 @@ impl<'a> CommandHandler<'a> { rx_bytes: "-".to_string(), tx_bytes: "-".to_string(), tunnel_proto: "-".to_string(), + priority: "-".to_string(), nat_type: if let Some(info) = p.stun_info { info.udp_nat_type().as_str_name().to_string() } else { @@ -1826,6 +1842,29 @@ impl<'a> CommandHandler<'a> { } async fn handle_connector_list(&self) -> Result<(), Error> { + #[derive(tabled::Tabled, serde::Serialize)] + struct ConnectorTableItem { + url: String, + status: String, + priority: String, + } + + impl From for ConnectorTableItem { + fn from(connector: Connector) -> Self { + Self { + url: connector + .url + .map(Into::::into) + .map(|url| url.to_string()) + .unwrap_or_default(), + status: ConnectorStatus::try_from(connector.status) + .map(|status| format!("{:?}", status)) + .unwrap_or_else(|_| connector.status.to_string()), + priority: connector.priority.to_string(), + } + } + } + let results = self .collect_instance_results(|handler| Box::pin(handler.fetch_connector_list())) .await?; @@ -1833,8 +1872,13 @@ impl<'a> CommandHandler<'a> { return self.print_json_results(results); } self.print_results(&results, |connectors| { - println!("response: {:#?}", connectors); - Ok(()) + let mut items = connectors + .iter() + .cloned() + .map(ConnectorTableItem::from) + .collect::>(); + items.sort_by(|a, b| a.url.cmp(&b.url)); + print_output(&items, self.output_format, &[], &[], self.no_trunc) }) } @@ -1873,6 +1917,7 @@ impl<'a> CommandHandler<'a> { &self, url: &str, action: ConfigPatchAction, + priority: Option, ) -> Result<(), Error> { let url = Self::mapped_listener_validate_url(url)?; let client = self.get_config_client().await?; @@ -1882,6 +1927,7 @@ impl<'a> CommandHandler<'a> { mapped_listeners: vec![UrlPatch { action: action.into(), url: Some(url.into()), + priority, }], ..Default::default() }), @@ -1896,11 +1942,16 @@ impl<'a> CommandHandler<'a> { &self, url: &str, action: ConfigPatchAction, + priority: Option, ) -> Result<(), Error> { let url = url.to_string(); self.apply_to_instances(|handler| { let url = url.clone(); - Box::pin(async move { handler.apply_mapped_listener_modify(&url, action).await }) + Box::pin(async move { + handler + .apply_mapped_listener_modify(&url, action, priority) + .await + }) }) .await } @@ -2883,15 +2934,17 @@ async fn main() -> Result<(), Error> { } }, SubCommand::Connector(conn_args) => match conn_args.sub_command { - Some(ConnectorSubCommand::Add { url }) => { + Some(ConnectorSubCommand::Add { url, priority }) => { handler - .handle_connector_modify(&url, ConfigPatchAction::Add) + .handle_connector_modify(&url, ConfigPatchAction::Add, Some(priority)) .await?; - println!("connector add applied to selected instance(s): {url}"); + println!( + "connector add applied to selected instance(s): {url}, priority: {priority}" + ); } Some(ConnectorSubCommand::Remove { url }) => { handler - .handle_connector_modify(&url, ConfigPatchAction::Remove) + .handle_connector_modify(&url, ConfigPatchAction::Remove, None) .await?; println!("connector remove applied to selected instance(s): {url}"); } @@ -2904,15 +2957,15 @@ async fn main() -> Result<(), Error> { }, SubCommand::MappedListener(mapped_listener_args) => { match mapped_listener_args.sub_command { - Some(MappedListenerSubCommand::Add { url }) => { + Some(MappedListenerSubCommand::Add { url, priority }) => { handler - .handle_mapped_listener_modify(&url, ConfigPatchAction::Add) + .handle_mapped_listener_modify(&url, ConfigPatchAction::Add, Some(priority)) .await?; - println!("add mapped listener: {url}"); + println!("add mapped listener: {url}, priority: {priority}"); } Some(MappedListenerSubCommand::Remove { url }) => { handler - .handle_mapped_listener_modify(&url, ConfigPatchAction::Remove) + .handle_mapped_listener_modify(&url, ConfigPatchAction::Remove, None) .await?; println!("remove mapped listener: {url}"); } diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 0f009088..edbb6884 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -560,16 +560,39 @@ impl InstanceConfigPatcher { return Ok(()); } let global_ctx = weak_upgrade(&self.global_ctx)?; - let mut current_mapped_listeners = global_ctx.config.get_mapped_listeners(); + let current_mapped_listener_configs = global_ctx.config.get_mapped_listener_configs(); + let mut priority_by_url = current_mapped_listener_configs + .iter() + .map(|listener| (listener.url.clone(), listener.priority)) + .collect::>(); + let mut current_mapped_listeners = current_mapped_listener_configs + .into_iter() + .map(|listener| listener.url) + .collect(); + for patch in &mapped_listeners { + if let (Some(url), Some(priority)) = (&patch.url, patch.priority) { + priority_by_url.insert(url.clone().into(), priority); + } + } let patches = mapped_listeners.into_iter().map(Into::into).collect(); InstanceConfigPatcher::trace_patchables(&patches); crate::proto::api::config::patch_vec(&mut current_mapped_listeners, patches); if current_mapped_listeners.is_empty() { - global_ctx.config.set_mapped_listeners(None); + global_ctx.config.set_mapped_listener_configs(None); } else { + let mapped_listener_configs = current_mapped_listeners + .into_iter() + .map(|url| { + let priority = priority_by_url + .get(&url) + .copied() + .unwrap_or(crate::common::config::DEFAULT_CONNECTION_PRIORITY); + crate::common::config::ListenerConfig::new(url, priority) + }) + .collect(); global_ctx .config - .set_mapped_listeners(Some(current_mapped_listeners)); + .set_mapped_listener_configs(Some(mapped_listener_configs)); } Ok(()) } @@ -590,7 +613,14 @@ impl InstanceConfigPatcher { match ConfigPatchAction::try_from(connector.action) { Ok(ConfigPatchAction::Add) => { tracing::info!("Connector added: {}", url); - conn_manager.add_connector_by_url(url).await?; + conn_manager + .add_connector_by_url_with_priority( + url, + connector + .priority + .unwrap_or(crate::common::config::DEFAULT_CONNECTION_PRIORITY), + ) + .await?; } Ok(ConfigPatchAction::Remove) => { tracing::info!("Connector removed: {}", url); @@ -742,7 +772,7 @@ impl Instance { async fn add_initial_peers(&self) -> Result<(), Error> { for peer in self.global_ctx.config.get_peers().iter() { self.get_conn_manager() - .add_connector_by_url(peer.uri.clone()) + .add_connector_by_url_with_priority(peer.uri.clone(), peer.priority) .await?; } Ok(()) @@ -1228,11 +1258,12 @@ impl Instance { _request: ListMappedListenerRequest, ) -> Result { let mut ret = ListMappedListenerResponse::default(); - let urls = weak_upgrade(&self.0)?.config.get_mapped_listeners(); - let mapped_listeners: Vec = urls + let listener_configs = weak_upgrade(&self.0)?.config.get_mapped_listener_configs(); + let mapped_listeners: Vec = listener_configs .into_iter() - .map(|u| MappedListener { - url: Some(u.into()), + .map(|listener| MappedListener { + url: Some(listener.url.into()), + priority: listener.priority, }) .collect(); ret.mappedlisteners = mapped_listeners; diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index 72a1b9af..fe691a1f 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -91,6 +91,7 @@ pub type ListenerCreator = Box; struct ListenerFactory { creator_fn: Arc, must_succ: bool, + priority: u32, } pub struct ListenerManager { @@ -125,8 +126,9 @@ impl ListenerManage ) .await?; - for l in self.global_ctx.config.get_listener_uris().iter() { - let l = l.clone(); + for listener_cfg in self.global_ctx.config.get_listener_configs().iter() { + let l = listener_cfg.url.clone(); + let priority = listener_cfg.priority; let Ok(_) = create_listener_by_url(&l, self.global_ctx.clone()) else { let msg = format!("failed to get listener by url: {}, maybe not supported", l); self.global_ctx @@ -136,9 +138,10 @@ impl ListenerManage let ctx = self.global_ctx.clone(); let listener = l.clone(); - self.add_listener( + self.add_listener_with_priority( move || create_listener_by_url(&listener, ctx.clone()).unwrap(), true, + priority, ) .await?; @@ -153,9 +156,10 @@ impl ListenerManage .set_host(Some("[::]".to_string().as_str())) .with_context(|| format!("failed to set ipv6 host for listener: {}", l))?; let ctx = self.global_ctx.clone(); - self.add_listener( + self.add_listener_with_priority( move || create_listener_by_url(&ipv6_listener, ctx.clone()).unwrap(), false, + priority, ) .await?; } @@ -168,10 +172,25 @@ impl ListenerManage &mut self, creator: C, must_succ: bool, + ) -> Result<(), Error> { + self.add_listener_with_priority( + creator, + must_succ, + crate::common::config::DEFAULT_CONNECTION_PRIORITY, + ) + .await + } + + pub async fn add_listener_with_priority( + &mut self, + creator: C, + must_succ: bool, + priority: u32, ) -> Result<(), Error> { self.listeners.push(ListenerFactory { creator_fn: Arc::new(Box::new(creator)), must_succ, + priority, }); Ok(()) } @@ -181,6 +200,7 @@ impl ListenerManage creator: Arc, peer_manager: Weak, global_ctx: ArcGlobalCtx, + priority: u32, ) { let mut err_count = 0; loop { @@ -189,7 +209,7 @@ impl ListenerManage match l.listen().await { Ok(_) => { err_count = 0; - global_ctx.add_running_listener(l.local_url()); + global_ctx.add_running_listener_with_priority(l.local_url(), priority); global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); } Err(e) => { @@ -270,6 +290,7 @@ impl ListenerManage listener.creator_fn.clone(), self.peer_manager.clone(), self.global_ctx.clone(), + listener.priority, )); } diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index eff846ee..da277769 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -551,6 +551,7 @@ impl NetworkConfig { format!("failed to parse public server uri: {}", public_server_url) })?, peer_public_key: None, + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }]); } NetworkingMethod::Manual => { @@ -564,6 +565,7 @@ impl NetworkConfig { .parse() .with_context(|| format!("failed to parse peer uri: {}", peer_url))?, peer_public_key: None, + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }); } if !peers.is_empty() { @@ -1109,6 +1111,7 @@ mod tests { peers.push(crate::common::config::PeerConfig { uri, peer_public_key: None, + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }); } config.set_peers(peers); diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 31aef0d6..c3b80e6c 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -279,7 +279,7 @@ impl ForeignNetworkEntry { flags.disable_relay_quic = !global_ctx.get_flags().enable_relay_foreign_network_quic; config.set_flags(flags); - config.set_mapped_listeners(Some(global_ctx.config.get_mapped_listeners())); + config.set_mapped_listener_configs(Some(global_ctx.config.get_mapped_listener_configs())); let foreign_global_ctx = Arc::new(GlobalCtx::new(config)); foreign_global_ctx @@ -291,8 +291,8 @@ impl ForeignNetworkEntry { Self::desired_avoid_relay_data_feature_flag(&global_ctx, relay_data); foreign_global_ctx.set_base_advertised_feature_flags(feature_flag); - for u in global_ctx.get_running_listeners().into_iter() { - foreign_global_ctx.add_running_listener(u); + for listener in global_ctx.get_running_listener_configs().into_iter() { + foreign_global_ctx.add_running_listener_with_priority(listener.url, listener.priority); } foreign_global_ctx diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 63af93f8..c46e8001 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -188,23 +188,37 @@ impl Peer { async fn select_conn(&self) -> Option { let default_conn_id = self.default_conn_id.load(); - if let Some(conn) = self.conns.get(&default_conn_id) { - return Some(conn.clone()); - } + let default_conn = self.conns.get(&default_conn_id).and_then(|conn| { + if conn.is_closed() { + None + } else { + Some((conn.priority(), conn.clone())) + } + }); - // find a conn with the smallest latency - let mut min_latency = u64::MAX; - for conn in self.conns.iter() { - let latency = conn.value().get_stats().latency_us; - if latency < min_latency { - min_latency = latency; - self.default_conn_id.store(conn.get_conn_id()); + if let Some((default_priority, default_conn)) = default_conn { + let has_better_conn = self.conns.iter().any(|conn| { + !conn.value().is_closed() && conn.value().priority() < default_priority + }); + if !has_better_conn { + return Some(default_conn); } } - self.conns - .get(&self.default_conn_id.load()) - .map(|conn| conn.clone()) + // Prefer lower listener priority first, then use latency to pick the + // initial connection within that priority class. Keep the selected conn + // sticky until a strictly better priority appears. + let selected_conn_id = self + .conns + .iter() + .filter(|conn| !conn.value().is_closed()) + .min_by_key(|conn| (conn.value().priority(), conn.value().get_stats().latency_us)) + .map(|conn| conn.get_conn_id()); + + selected_conn_id.and_then(|conn_id| { + self.default_conn_id.store(conn_id); + self.conns.get(&conn_id).map(|conn| conn.clone()) + }) } pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> { @@ -249,12 +263,26 @@ impl Peer { self.conns.iter().any(|entry| !entry.value().is_closed()) } + pub fn has_conn_with_priority_at_most(&self, priority: u32) -> bool { + self.conns + .iter() + .any(|entry| !entry.value().is_closed() && entry.value().priority() <= priority) + } + pub fn has_directly_connected_conn(&self) -> bool { self.conns .iter() .any(|entry| !entry.value().is_closed() && !entry.value().is_hole_punched()) } + pub fn has_directly_connected_conn_with_priority_at_most(&self, priority: u32) -> bool { + self.conns.iter().any(|entry| { + !entry.value().is_closed() + && !entry.value().is_hole_punched() + && entry.value().priority() <= priority + }) + } + pub fn get_directly_connections(&self) -> DashSet { self.conns .iter() @@ -298,7 +326,7 @@ mod tests { use crate::{ common::{ - config::{NetworkIdentity, PeerConfig}, + config::{DEFAULT_CONNECTION_PRIORITY, NetworkIdentity, PeerConfig}, global_ctx::{GlobalCtx, tests::get_mock_global_ctx}, new_peer_id, }, @@ -380,6 +408,87 @@ mod tests { close_handler.await.unwrap().unwrap(); } + #[tokio::test] + async fn select_conn_prefers_lower_priority_before_latency() { + let (packet_send, _packet_recv) = create_packet_recv_chan(); + let global_ctx = get_mock_global_ctx(); + let local_peer_id = new_peer_id(); + let remote_peer_id = new_peer_id(); + let peer = Peer::new(remote_peer_id, packet_send, global_ctx.clone()); + let ps = Arc::new(PeerSessionStore::new()); + + let (low_client_tunnel, low_server_tunnel) = create_ring_tunnel_pair(); + let mut low_client_conn = PeerConn::new( + local_peer_id, + global_ctx.clone(), + low_client_tunnel, + ps.clone(), + ); + low_client_conn.set_priority(100); + let low_conn_id = low_client_conn.get_conn_id(); + let mut low_server_conn = PeerConn::new( + remote_peer_id, + global_ctx.clone(), + low_server_tunnel, + ps.clone(), + ); + let (client_ret, server_ret) = tokio::join!( + low_client_conn.do_handshake_as_client(), + low_server_conn.do_handshake_as_server() + ); + client_ret.unwrap(); + server_ret.unwrap(); + peer.add_peer_conn(low_client_conn).await.unwrap(); + assert_eq!(peer.select_conn().await.unwrap().get_conn_id(), low_conn_id); + + let (same_priority_client_tunnel, same_priority_server_tunnel) = create_ring_tunnel_pair(); + let mut same_priority_client_conn = PeerConn::new( + local_peer_id, + global_ctx.clone(), + same_priority_client_tunnel, + ps.clone(), + ); + same_priority_client_conn.set_priority(100); + let mut same_priority_server_conn = PeerConn::new( + remote_peer_id, + global_ctx.clone(), + same_priority_server_tunnel, + ps.clone(), + ); + let (client_ret, server_ret) = tokio::join!( + same_priority_client_conn.do_handshake_as_client(), + same_priority_server_conn.do_handshake_as_server() + ); + client_ret.unwrap(); + server_ret.unwrap(); + peer.add_peer_conn(same_priority_client_conn).await.unwrap(); + assert_eq!(peer.select_conn().await.unwrap().get_conn_id(), low_conn_id); + + let (high_client_tunnel, high_server_tunnel) = create_ring_tunnel_pair(); + let mut high_client_conn = PeerConn::new( + local_peer_id, + global_ctx.clone(), + high_client_tunnel, + ps.clone(), + ); + high_client_conn.set_priority(DEFAULT_CONNECTION_PRIORITY); + let high_conn_id = high_client_conn.get_conn_id(); + let mut high_server_conn = + PeerConn::new(remote_peer_id, global_ctx, high_server_tunnel, ps); + let (client_ret, server_ret) = tokio::join!( + high_client_conn.do_handshake_as_client(), + high_server_conn.do_handshake_as_server() + ); + client_ret.unwrap(); + server_ret.unwrap(); + peer.add_peer_conn(high_client_conn).await.unwrap(); + + assert_eq!( + peer.select_conn().await.unwrap().get_conn_id(), + high_conn_id + ); + } + #[tokio::test] async fn reject_peer_conn_with_mismatched_identity_type() { let (packet_send, _packet_recv) = create_packet_recv_chan(); @@ -423,6 +532,7 @@ mod tests { .local_public_key .unwrap(), ), + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }]); let mut shared_client_conn = PeerConn::new( local_peer_id, diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 5e2b6d55..641a6d42 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -37,7 +37,7 @@ use crate::utils::BoxExt; use crate::{ common::{ PeerId, - config::{NetworkIdentity, NetworkSecretDigest}, + config::{DEFAULT_CONNECTION_PRIORITY, NetworkIdentity, NetworkSecretDigest}, error::Error, global_ctx::ArcGlobalCtx, }, @@ -305,6 +305,7 @@ pub struct PeerConn { // remote or local is_hole_punched: bool, + priority: u32, close_event_notifier: Arc, @@ -393,6 +394,7 @@ impl PeerConn { is_client: None, is_hole_punched: true, + priority: DEFAULT_CONNECTION_PRIORITY, close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)), @@ -442,6 +444,14 @@ impl PeerConn { self.is_hole_punched } + pub fn set_priority(&mut self, priority: u32) { + self.priority = priority; + } + + pub fn priority(&self) -> u32 { + self.priority + } + pub fn is_closed(&self) -> bool { self.close_event_notifier.is_closed() } @@ -529,6 +539,7 @@ impl PeerConn { version: VERSION, features: Vec::new(), network_name: network.network_name.clone(), + connection_priority: self.priority, ..Default::default() }; @@ -821,6 +832,7 @@ impl PeerConn { a_session_generation, a_conn_id: Some(a_conn_id.into()), client_encryption_algorithm: self.my_encrypt_algo.clone(), + connection_priority: self.priority, }; let mut hs = builder @@ -1072,6 +1084,7 @@ impl PeerConn { Some(&mut hs), first_msg1, )?; + self.priority = msg1_pb.connection_priority; let remote_network_name = msg1_pb.a_network_name.clone(); self.record_control_rx(&remote_network_name, first_msg1_len); @@ -1227,6 +1240,7 @@ impl PeerConn { features: Vec::new(), network_secret_digest: noise.secret_digest.clone(), + connection_priority: self.priority, } } @@ -1264,6 +1278,7 @@ impl PeerConn { self.is_client = Some(false); } else if hdr.packet_type == PacketType::HandShake as u8 { let rsp = Self::decode_handshake_packet(&first_pkt)?; + self.priority = rsp.connection_priority; handshake_recved(self, &rsp.network_name)?; tracing::info!("handshake request: {:?}", rsp); self.record_control_rx(&rsp.network_name, first_pkt.buf_len() as u64); @@ -1566,6 +1581,7 @@ impl PeerConn { .as_ref() .map(|x| x.peer_identity_type as i32) .unwrap_or(PeerIdentityType::Admin as i32), + priority: self.priority, } } @@ -2088,6 +2104,7 @@ pub mod tests { .local_public_key .unwrap(), ), + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }]); let ps = Arc::new(PeerSessionStore::new()); diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index c5d03fdd..a17398b6 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -22,6 +22,7 @@ use crate::{ common::{ PeerId, compressor::{Compressor as _, DefaultCompressor}, + config::DEFAULT_CONNECTION_PRIORITY, constants::EASYTIER_VERSION, error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, @@ -31,6 +32,7 @@ use crate::{ }, peers::{ PeerPacketFilter, + peer::Peer, peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, peer_session::PeerSessionStore, @@ -594,6 +596,22 @@ impl PeerManager { tunnel: Box, is_directly_connected: bool, peer_id_hint: Option, + ) -> Result<(PeerId, PeerConnId), Error> { + self.add_client_tunnel_with_peer_id_hint_and_priority( + tunnel, + is_directly_connected, + peer_id_hint, + DEFAULT_CONNECTION_PRIORITY, + ) + .await + } + + pub(crate) async fn add_client_tunnel_with_peer_id_hint_and_priority( + &self, + tunnel: Box, + is_directly_connected: bool, + peer_id_hint: Option, + priority: u32, ) -> Result<(PeerId, PeerConnId), Error> { let mut peer = PeerConn::new_with_peer_id_hint( self.my_peer_id, @@ -602,6 +620,7 @@ impl PeerManager { peer_id_hint, self.peer_session_store.clone(), ); + peer.set_priority(priority); peer.set_is_hole_punched(!is_directly_connected); peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); @@ -616,6 +635,14 @@ impl PeerManager { Ok((peer_id, conn_id)) } + fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { + self.peers.get_peer_by_id(peer_id).or_else(|| { + self.foreign_network_client + .get_peer_map() + .get_peer_by_id(peer_id) + }) + } + pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool { if let Some(peer) = self.peers.get_peer_by_id(peer_id) { peer.has_directly_connected_conn() @@ -624,6 +651,20 @@ impl PeerManager { } } + pub(crate) fn has_directly_connected_conn_with_priority_at_most( + &self, + peer_id: PeerId, + priority: u32, + ) -> bool { + self.get_peer_by_id(peer_id) + .is_some_and(|peer| peer.has_directly_connected_conn_with_priority_at_most(priority)) + } + + pub(crate) fn has_conn_with_priority_at_most(&self, peer_id: PeerId, priority: u32) -> bool { + self.get_peer_by_id(peer_id) + .is_some_and(|peer| peer.has_conn_with_priority_at_most(priority)) + } + #[tracing::instrument] pub async fn try_direct_connect(&self, connector: C) -> Result<(PeerId, PeerConnId), Error> where @@ -642,11 +683,12 @@ impl PeerManager { where C: TunnelConnector + Debug, { + let priority = connector.priority(); let ns = self.global_ctx.net_ns.clone(); let t = ns .run_async(|| async move { connector.connect().await }) .await?; - self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) + self.add_client_tunnel_with_peer_id_hint_and_priority(t, true, peer_id_hint, priority) .await } @@ -3035,6 +3077,7 @@ mod tests { crate::common::config::PeerConfig { uri: server_remote_url, peer_public_key: Some(server_pub_b64.clone()), + priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY, }, ]); diff --git a/easytier/src/peers/peer_rpc_service.rs b/easytier/src/peers/peer_rpc_service.rs index 08e82d2d..6a855d12 100644 --- a/easytier/src/peers/peer_rpc_service.rs +++ b/easytier/src/peers/peer_rpc_service.rs @@ -5,7 +5,8 @@ use crate::{ proto::{ common::Void, peer_rpc::{ - DirectConnectorRpc, GetIpListRequest, GetIpListResponse, SendUdpHolePunchPacketRequest, + DirectConnectorRpc, GetIpListRequest, GetIpListResponse, ListenerInfo, + SendUdpHolePunchPacketRequest, }, rpc_types::{self, controller::BaseController}, }, @@ -44,13 +45,23 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer { _: GetIpListRequest, ) -> rpc_types::error::Result { let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; - ret.listeners = self + let listener_configs = self .global_ctx .config - .get_mapped_listeners() + .get_mapped_listener_configs() .into_iter() - .chain(self.global_ctx.get_running_listeners()) - .map(Into::into) + .chain(self.global_ctx.get_running_listener_configs()) + .collect::>(); + ret.listeners = listener_configs + .iter() + .map(|listener| listener.url.clone().into()) + .collect(); + ret.listener_infos = listener_configs + .into_iter() + .map(|listener| ListenerInfo { + url: Some(listener.url.into()), + priority: listener.priority, + }) .collect(); remove_easytier_managed_ipv6s(&mut ret, &self.global_ctx); tracing::trace!( diff --git a/easytier/src/proto/api.rs b/easytier/src/proto/api.rs index d3b81be6..e8cc157c 100644 --- a/easytier/src/proto/api.rs +++ b/easytier/src/proto/api.rs @@ -141,6 +141,21 @@ pub mod instance { ret } + pub fn get_conn_priority(&self) -> Option { + let p = self.peer.as_ref()?; + let default_conn_id = p.default_conn_id.map(|id| id.to_string()); + let mut ret = None; + for conn in p.conns.iter() { + if default_conn_id == Some(conn.conn_id.to_string()) { + return Some(conn.priority); + } + + ret.get_or_insert(conn.priority); + } + + ret + } + fn get_tunnel_proto_str(tunnel_info: &super::super::common::TunnelInfo) -> String { tunnel_info.display_tunnel_type() } diff --git a/easytier/src/proto/api_config.proto b/easytier/src/proto/api_config.proto index ccfa30cc..eddc8423 100644 --- a/easytier/src/proto/api_config.proto +++ b/easytier/src/proto/api_config.proto @@ -43,6 +43,7 @@ message StringPatch { message UrlPatch { ConfigPatchAction action = 1; common.Url url = 2; + optional uint32 priority = 3; } message AclPatch { diff --git a/easytier/src/proto/api_instance.proto b/easytier/src/proto/api_instance.proto index 12fe18f6..45355c48 100644 --- a/easytier/src/proto/api_instance.proto +++ b/easytier/src/proto/api_instance.proto @@ -45,6 +45,7 @@ message PeerConnInfo { bytes noise_remote_static_pubkey = 12; peer_rpc.SecureAuthLevel secure_auth_level = 13; peer_rpc.PeerIdentityType peer_identity_type = 14; + uint32 priority = 15; } message PeerInfo { @@ -208,6 +209,7 @@ enum ConnectorStatus { message Connector { common.Url url = 1; ConnectorStatus status = 2; + uint32 priority = 3; } message ListConnectorRequest { InstanceIdentifier instance = 1; } @@ -218,7 +220,10 @@ service ConnectorManageRpc { rpc ListConnector(ListConnectorRequest) returns (ListConnectorResponse); } -message MappedListener { common.Url url = 1; } +message MappedListener { + common.Url url = 1; + uint32 priority = 2; +} message ListMappedListenerRequest { InstanceIdentifier instance = 1; } diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 23dc417a..51b82ae2 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -184,6 +184,12 @@ message GetIpListResponse { common.Ipv6Addr public_ipv6 = 3; repeated common.Ipv6Addr interface_ipv6s = 4; repeated common.Url listeners = 5; + repeated ListenerInfo listener_infos = 6; +} + +message ListenerInfo { + common.Url url = 1; + uint32 priority = 2; } message SendUdpHolePunchPacketRequest { @@ -314,6 +320,7 @@ message HandshakeRequest { repeated string features = 4; string network_name = 5; bytes network_secret_digest = 6; + uint32 connection_priority = 7; } message KcpConnData { @@ -346,6 +353,7 @@ message PeerConnNoiseMsg1Pb { optional uint32 a_session_generation = 3; common.UUID a_conn_id = 4; string client_encryption_algorithm = 5; + uint32 connection_priority = 6; } message PeerConnNoiseMsg2Pb { diff --git a/easytier/src/tunnel/mod.rs b/easytier/src/tunnel/mod.rs index 06629b6c..6c29ab8a 100644 --- a/easytier/src/tunnel/mod.rs +++ b/easytier/src/tunnel/mod.rs @@ -3,7 +3,7 @@ use std::{ }; use crate::{ - common::{dns::socket_addrs, error::Error}, + common::{config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, error::Error}, proto::common::TunnelInfo, }; use async_trait::async_trait; @@ -139,11 +139,53 @@ pub trait TunnelListener: Send { pub trait TunnelConnector: Send { async fn connect(&mut self) -> Result, TunnelError>; fn remote_url(&self) -> url::Url; + fn priority(&self) -> u32 { + DEFAULT_CONNECTION_PRIORITY + } fn set_bind_addrs(&mut self, _addrs: Vec) {} fn set_ip_version(&mut self, _ip_version: IpVersion) {} fn set_resolved_addr(&mut self, _addr: SocketAddr) {} } +#[derive(Debug)] +pub struct PrioritizedConnector { + inner: C, + priority: u32, +} + +impl PrioritizedConnector { + pub fn new(inner: C, priority: u32) -> Self { + Self { inner, priority } + } +} + +#[async_trait] +impl TunnelConnector for PrioritizedConnector { + async fn connect(&mut self) -> Result, TunnelError> { + self.inner.connect().await + } + + fn remote_url(&self) -> url::Url { + self.inner.remote_url() + } + + fn priority(&self) -> u32 { + self.priority + } + + fn set_bind_addrs(&mut self, addrs: Vec) { + self.inner.set_bind_addrs(addrs); + } + + fn set_ip_version(&mut self, ip_version: IpVersion) { + self.inner.set_ip_version(ip_version); + } + + fn set_resolved_addr(&mut self, addr: SocketAddr) { + self.inner.set_resolved_addr(addr); + } +} + pub fn build_url_from_socket_addr(addr: &String, scheme: &str) -> url::Url { if let Ok(sock_addr) = addr.parse::() { let url_str = format!("{}://0.0.0.0", scheme);