feat: support listener connection priorities

This commit is contained in:
fanyang
2026-05-02 17:38:37 +08:00
parent 362aa7a9cd
commit e072587721
22 changed files with 863 additions and 145 deletions
+132 -5
View File
@@ -28,6 +28,55 @@ use super::env_parser;
pub type Flags = crate::proto::common::FlagsInConfig;
pub const DEFAULT_CONNECTION_PRIORITY: u32 = 0;
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(untagged)]
enum ListenerConfigDef {
Url(url::Url),
Config {
url: url::Url,
#[serde(default)]
priority: u32,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(from = "ListenerConfigDef")]
pub struct ListenerConfig {
pub url: url::Url,
pub priority: u32,
}
impl ListenerConfig {
pub fn new(url: url::Url, priority: u32) -> Self {
Self { url, priority }
}
pub fn with_default_priority(url: url::Url) -> Self {
Self::new(url, DEFAULT_CONNECTION_PRIORITY)
}
}
impl From<url::Url> for ListenerConfig {
fn from(url: url::Url) -> Self {
Self::with_default_priority(url)
}
}
impl From<ListenerConfigDef> for ListenerConfig {
fn from(def: ListenerConfigDef) -> Self {
match def {
ListenerConfigDef::Url(url) => Self::with_default_priority(url),
ListenerConfigDef::Config { url, priority } => Self::new(url, priority),
}
}
}
fn listener_config_urls(listeners: Vec<ListenerConfig>) -> Vec<url::Url> {
listeners.into_iter().map(|listener| listener.url).collect()
}
pub fn gen_default_flags() -> Flags {
#[allow(deprecated)]
Flags {
@@ -196,6 +245,7 @@ pub trait ConfigLoader: Send + Sync {
fn set_network_identity(&self, identity: NetworkIdentity);
fn get_listener_uris(&self) -> Vec<url::Url>;
fn get_listener_configs(&self) -> Vec<ListenerConfig>;
fn get_peers(&self) -> Vec<PeerConfig>;
fn set_peers(&self, peers: Vec<PeerConfig>);
@@ -205,6 +255,8 @@ pub trait ConfigLoader: Send + Sync {
fn get_mapped_listeners(&self) -> Vec<url::Url>;
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
fn get_mapped_listener_configs(&self) -> Vec<ListenerConfig>;
fn set_mapped_listener_configs(&self, listeners: Option<Vec<ListenerConfig>>);
fn get_vpn_portal_config(&self) -> Option<VpnPortalConfig>;
fn set_vpn_portal_config(&self, config: VpnPortalConfig);
@@ -403,6 +455,8 @@ impl Default for NetworkIdentity {
pub struct PeerConfig {
pub uri: url::Url,
pub peer_public_key: Option<String>,
#[serde(default)]
pub priority: u32,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
@@ -534,8 +588,8 @@ struct Config {
ipv6_public_addr_prefix: Option<String>,
dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>,
mapped_listeners: Option<Vec<url::Url>>,
listeners: Option<Vec<ListenerConfig>>,
mapped_listeners: Option<Vec<ListenerConfig>>,
exit_nodes: Option<Vec<IpAddr>>,
peer: Option<Vec<PeerConfig>>,
@@ -849,6 +903,10 @@ impl ConfigLoader for TomlConfigLoader {
}
fn get_listener_uris(&self) -> Vec<url::Url> {
listener_config_urls(self.get_listener_configs())
}
fn get_listener_configs(&self) -> Vec<ListenerConfig> {
self.config
.lock()
.unwrap()
@@ -866,14 +924,29 @@ impl ConfigLoader for TomlConfigLoader {
}
fn get_listeners(&self) -> Option<Vec<url::Url>> {
self.config.lock().unwrap().listeners.clone()
self.config
.lock()
.unwrap()
.listeners
.clone()
.map(listener_config_urls)
}
fn set_listeners(&self, listeners: Vec<url::Url>) {
self.config.lock().unwrap().listeners = Some(listeners);
self.config.lock().unwrap().listeners =
Some(listeners.into_iter().map(Into::into).collect());
}
fn get_mapped_listeners(&self) -> Vec<url::Url> {
listener_config_urls(self.get_mapped_listener_configs())
}
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
self.config.lock().unwrap().mapped_listeners =
listeners.map(|listeners| listeners.into_iter().map(Into::into).collect());
}
fn get_mapped_listener_configs(&self) -> Vec<ListenerConfig> {
self.config
.lock()
.unwrap()
@@ -882,7 +955,7 @@ impl ConfigLoader for TomlConfigLoader {
.unwrap_or_default()
}
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
fn set_mapped_listener_configs(&self, listeners: Option<Vec<ListenerConfig>>) {
self.config.lock().unwrap().mapped_listeners = listeners;
}
@@ -1402,6 +1475,60 @@ members = ["admin"]
assert_eq!(group.members, vec!["admin"]);
}
#[test]
fn test_listener_priority_config_supports_old_and_structured_values() {
let config = TomlConfigLoader::new_from_str(
r#"
listeners = [
"tcp://0.0.0.0:11010",
{ url = "udp://0.0.0.0:11010", priority = 80 },
]
mapped_listeners = [
"tcp://example.com:11010",
{ url = "tcp://frps.example.com:30001", priority = 100 },
]
[[peer]]
uri = "tcp://proxy.example.com:443"
priority = 100
[[peer]]
uri = "tcp://normal.example.com:11010"
"#,
)
.unwrap();
let listeners = config.get_listener_configs();
assert_eq!(listeners[0].url.to_string(), "tcp://0.0.0.0:11010");
assert_eq!(listeners[0].priority, DEFAULT_CONNECTION_PRIORITY);
assert_eq!(listeners[1].url.to_string(), "udp://0.0.0.0:11010");
assert_eq!(listeners[1].priority, 80);
let mapped_listeners = config.get_mapped_listener_configs();
assert_eq!(
mapped_listeners[0].url.to_string(),
"tcp://example.com:11010"
);
assert_eq!(mapped_listeners[0].priority, DEFAULT_CONNECTION_PRIORITY);
assert_eq!(
mapped_listeners[1].url.to_string(),
"tcp://frps.example.com:30001"
);
assert_eq!(mapped_listeners[1].priority, 100);
let peers = config.get_peers();
assert_eq!(peers[0].uri.to_string(), "tcp://proxy.example.com:443");
assert_eq!(peers[0].priority, 100);
assert_eq!(peers[1].uri.to_string(), "tcp://normal.example.com:11010");
assert_eq!(peers[1].priority, DEFAULT_CONNECTION_PRIORITY);
let dumped = config.dump();
let reloaded = TomlConfigLoader::new_from_str(&dumped).unwrap();
assert_eq!(reloaded.get_listener_configs(), listeners);
assert_eq!(reloaded.get_mapped_listener_configs(), mapped_listeners);
assert_eq!(reloaded.get_peers(), peers);
}
#[test]
fn test_network_config_source_user_is_implicit() {
let config = TomlConfigLoader::default();
+22 -9
View File
@@ -11,7 +11,7 @@ use dashmap::DashMap;
use super::{
PeerId,
config::{ConfigLoader, Flags},
config::{ConfigLoader, DEFAULT_CONNECTION_PRIORITY, Flags, ListenerConfig},
netns::NetNS,
network::IPCollector,
stun::{StunInfoCollector, StunInfoCollectorTrait},
@@ -212,7 +212,7 @@ pub struct GlobalCtx {
stun_info_collection: Mutex<Arc<dyn StunInfoCollectorTrait>>,
running_listeners: Mutex<Vec<url::Url>>,
running_listeners: Mutex<Vec<ListenerConfig>>,
advertised_ipv6_public_addr_prefix: Mutex<Option<cidr::Ipv6Cidr>>,
flags: ArcSwap<Flags>,
@@ -509,13 +509,28 @@ impl GlobalCtx {
}
pub fn get_running_listeners(&self) -> Vec<url::Url> {
self.running_listeners
.lock()
.unwrap()
.iter()
.map(|listener| listener.url.clone())
.collect()
}
pub fn get_running_listener_configs(&self) -> Vec<ListenerConfig> {
self.running_listeners.lock().unwrap().clone()
}
pub fn add_running_listener(&self, url: url::Url) {
self.add_running_listener_with_priority(url, DEFAULT_CONNECTION_PRIORITY);
}
pub fn add_running_listener_with_priority(&self, url: url::Url, priority: u32) {
let mut l = self.running_listeners.lock().unwrap();
if !l.contains(&url) {
l.push(url);
if let Some(listener) = l.iter_mut().find(|listener| listener.url == url) {
listener.priority = priority;
} else {
l.push(ListenerConfig::new(url, priority));
}
}
@@ -744,11 +759,9 @@ impl GlobalCtx {
}
fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool {
self.running_listeners
.lock()
.unwrap()
.iter()
.any(|x| x.port() == Some(port) && matches_protocol!(x, Protocol::UDP) == is_udp)
self.running_listeners.lock().unwrap().iter().any(|x| {
x.url.port() == Some(port) && matches_protocol!(&x.url, Protocol::UDP) == is_udp
})
}
#[tracing::instrument(ret, skip(self))]
+233 -54
View File
@@ -13,8 +13,8 @@ use std::{
use crate::{
common::{
PeerId, dns::socket_addrs, error::Error, global_ctx::ArcGlobalCtx,
stun::StunInfoCollectorTrait,
PeerId, config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, error::Error,
global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait,
},
connector::udp_hole_punch::handle_rpc_result,
peers::{
@@ -31,7 +31,7 @@ use crate::{
},
rpc_types::controller::BaseController,
},
tunnel::{IpVersion, matches_protocol, udp::UdpTunnelConnector},
tunnel::{IpVersion, PrioritizedConnector, matches_protocol, udp::UdpTunnelConnector},
use_global_var,
};
@@ -48,6 +48,7 @@ use url::Host;
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
const DIRECT_CONNECTOR_LOW_PRIORITY_RETRY_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
@@ -131,11 +132,70 @@ struct DstBlackListItem(PeerId, String);
#[derive(Hash, Eq, PartialEq, Clone)]
struct DstListenerUrlBlackListItem(PeerId, String);
#[derive(Clone, Debug)]
struct AvailableListener {
url: url::Url,
priority: u32,
}
fn available_listeners_from_ip_list(
ip_list: &GetIpListResponse,
enable_ipv6: bool,
) -> Vec<AvailableListener> {
let candidate_listeners: Vec<AvailableListener> = if ip_list.listener_infos.is_empty() {
ip_list
.listeners
.iter()
.map(|url| AvailableListener {
url: url.clone().into(),
priority: DEFAULT_CONNECTION_PRIORITY,
})
.collect()
} else {
ip_list
.listener_infos
.iter()
.filter_map(|info| {
info.url.as_ref().map(|url| AvailableListener {
url: url.clone().into(),
priority: info.priority,
})
})
.collect()
};
candidate_listeners
.into_iter()
.filter(|l| l.url.scheme() != "ring")
.filter(|l| {
mapped_listener_port(&l.url).is_some()
&& l.url
.host()
.is_some_and(|host| enable_ipv6 || !matches!(host, Host::Ipv6(_)))
})
.collect()
}
fn sort_available_listeners(available_listeners: &mut [AvailableListener], default_protocol: &str) {
available_listeners.sort_by_key(|l| {
let scheme = l.url.scheme();
let protocol_priority = if scheme == default_protocol {
3
} else if scheme == "udp" {
2
} else {
1
};
(std::cmp::Reverse(l.priority), protocol_priority)
});
}
struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
peer_black_list: timedmap::TimedMap<PeerId, ()>,
low_priority_direct_retry_backoff: timedmap::TimedMap<PeerId, ()>,
}
impl DirectConnectorManagerData {
@@ -145,6 +205,7 @@ impl DirectConnectorManagerData {
peer_manager,
dst_listener_blacklist: timedmap::TimedMap::new(),
peer_black_list: timedmap::TimedMap::new(),
low_priority_direct_retry_backoff: timedmap::TimedMap::new(),
}
}
@@ -201,6 +262,7 @@ impl DirectConnectorManagerData {
&self,
dst_peer_id: PeerId,
remote_url: &url::Url,
priority: u32,
) -> Result<(PeerId, PeerConnId), Error> {
let local_socket = Arc::new(
UdpSocket::bind("[::]:0")
@@ -239,7 +301,12 @@ impl DirectConnectorManagerData {
// NOTICE: must add as directly connected tunnel
self.peer_manager
.add_client_tunnel_with_peer_id_hint(ret, true, Some(dst_peer_id))
.add_client_tunnel_with_peer_id_hint_and_priority(
ret,
true,
Some(dst_peer_id),
priority,
)
.await
}
@@ -247,6 +314,7 @@ impl DirectConnectorManagerData {
&self,
dst_peer_id: PeerId,
remote_url: &url::Url,
priority: u32,
) -> Result<(PeerId, PeerConnId), Error> {
let local_socket = {
let _g = self.global_ctx.net_ns.guard();
@@ -275,21 +343,34 @@ impl DirectConnectorManagerData {
.await?;
self.peer_manager
.add_client_tunnel_with_peer_id_hint(ret, true, Some(dst_peer_id))
.add_client_tunnel_with_peer_id_hint_and_priority(
ret,
true,
Some(dst_peer_id),
priority,
)
.await
}
async fn do_try_connect_to_ip(&self, dst_peer_id: PeerId, addr: String) -> Result<(), Error> {
async fn do_try_connect_to_ip(
&self,
dst_peer_id: PeerId,
addr: String,
priority: u32,
) -> Result<(), Error> {
let connector = create_connector_by_url(&addr, &self.global_ctx, IpVersion::Both).await?;
let remote_url = connector.remote_url();
let (peer_id, conn_id) = if matches_scheme!(remote_url, TunnelScheme::Ip(IpScheme::Udp)) {
match remote_url.host() {
Some(Host::Ipv6(_)) => {
self.connect_to_public_ipv6(dst_peer_id, &remote_url)
self.connect_to_public_ipv6(dst_peer_id, &remote_url, priority)
.await?
}
Some(Host::Ipv4(ip)) if is_public_ipv4(ip) => {
match self.connect_to_public_ipv4(dst_peer_id, &remote_url).await {
match self
.connect_to_public_ipv4(dst_peer_id, &remote_url, priority)
.await
{
Ok(ret) => ret,
Err(err) => {
tracing::debug!(
@@ -300,7 +381,7 @@ impl DirectConnectorManagerData {
timeout(
std::time::Duration::from_secs(3),
self.peer_manager.try_direct_connect_with_peer_id_hint(
connector,
PrioritizedConnector::new(connector, priority),
Some(dst_peer_id),
),
)
@@ -311,8 +392,10 @@ impl DirectConnectorManagerData {
_ => {
timeout(
std::time::Duration::from_secs(3),
self.peer_manager
.try_direct_connect_with_peer_id_hint(connector, Some(dst_peer_id)),
self.peer_manager.try_direct_connect_with_peer_id_hint(
PrioritizedConnector::new(connector, priority),
Some(dst_peer_id),
),
)
.await??
}
@@ -320,8 +403,10 @@ impl DirectConnectorManagerData {
} else {
timeout(
std::time::Duration::from_secs(3),
self.peer_manager
.try_direct_connect_with_peer_id_hint(connector, Some(dst_peer_id)),
self.peer_manager.try_direct_connect_with_peer_id_hint(
PrioritizedConnector::new(connector, priority),
Some(dst_peer_id),
),
)
.await??
};
@@ -345,6 +430,7 @@ impl DirectConnectorManagerData {
self: Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,
addr: String,
priority: u32,
) -> Result<(), Error> {
let mut rand_gen = rand::rngs::OsRng;
let backoff_ms = [1000, 2000, 4000];
@@ -361,19 +447,26 @@ impl DirectConnectorManagerData {
return Err(Error::UrlInBlacklist);
}
let has_good_direct_conn = || {
self.peer_manager
.has_directly_connected_conn_with_priority_at_most(dst_peer_id, priority)
};
loop {
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
if has_good_direct_conn() {
return Ok(());
}
tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start one round");
let ret = self.do_try_connect_to_ip(dst_peer_id, addr.clone()).await;
let ret = self
.do_try_connect_to_ip(dst_peer_id, addr.clone(), priority)
.await;
tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return");
if ret.is_ok() {
return Ok(());
}
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
if has_good_direct_conn() {
return Ok(());
}
@@ -404,17 +497,19 @@ impl DirectConnectorManagerData {
self: &Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,
ip_list: &GetIpListResponse,
listener: &url::Url,
listener: &AvailableListener,
tasks: &mut JoinSet<Result<(), Error>>,
) {
let Ok(mut addrs) = resolve_mapped_listener_addrs(listener).await else {
let Ok(mut addrs) = resolve_mapped_listener_addrs(&listener.url).await else {
tracing::error!(?listener, "failed to parse socket address from listener");
return;
};
let listener_host = addrs.pop();
tracing::info!(?listener_host, ?listener, "try direct connect to peer");
let is_udp = matches_protocol!(listener, Protocol::UDP);
let is_udp = matches_protocol!(&listener.url, Protocol::UDP);
let listener_url = &listener.url;
let priority = listener.priority;
// Snapshot running listeners once; used for cheap port pre-checks before the
// expensive should_deny_proxy call (which binds a socket per IP) in the
// unspecified-address expansion loops below.
@@ -449,12 +544,13 @@ impl DirectConnectorManagerData {
);
return;
}
let mut addr = (*listener).clone();
let mut addr = listener_url.clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
self.clone(),
dst_peer_id,
addr.to_string(),
priority,
));
} else {
tracing::error!(
@@ -475,7 +571,8 @@ impl DirectConnectorManagerData {
tasks.spawn(Self::try_connect_to_ip(
self.clone(),
dst_peer_id,
listener.to_string(),
listener_url.to_string(),
priority,
));
}
}
@@ -504,12 +601,13 @@ impl DirectConnectorManagerData {
);
return;
}
let mut addr = (*listener).clone();
let mut addr = listener_url.clone();
if addr.set_host(Some(format!("[{}]", ip).as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
self.clone(),
dst_peer_id,
addr.to_string(),
priority,
));
} else {
tracing::error!(
@@ -535,7 +633,8 @@ impl DirectConnectorManagerData {
tasks.spawn(Self::try_connect_to_ip(
self.clone(),
dst_peer_id,
listener.to_string(),
listener_url.to_string(),
priority,
));
}
}
@@ -553,15 +652,7 @@ impl DirectConnectorManagerData {
ip_list: GetIpListResponse,
) -> Result<(), Error> {
let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6;
let available_listeners = ip_list
.listeners
.clone()
.into_iter()
.map(Into::<url::Url>::into)
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| mapped_listener_port(l).is_some() && l.host().is_some())
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();
let mut available_listeners = available_listeners_from_ip_list(&ip_list, enable_ipv6);
tracing::debug!(?available_listeners, "got available listeners");
@@ -570,35 +661,30 @@ impl DirectConnectorManagerData {
}
let default_protocol = self.global_ctx.get_flags().default_protocol;
// sort available listeners, default protocol has the highest priority, udp is second, others just random
// highest priority is in the last
let mut available_listeners = available_listeners;
available_listeners.sort_by_key(|l| {
let scheme = l.scheme();
if scheme == default_protocol {
3
} else if scheme == "udp" {
2
} else {
1
}
});
// Sort by configured priority first (lower is better), then prefer the
// default protocol and UDP. The best candidate group is in the last slot.
sort_available_listeners(&mut available_listeners, &default_protocol);
while !available_listeners.is_empty() {
while let Some(cur_listener) = available_listeners.last() {
let mut tasks = JoinSet::new();
let mut listener_list = vec![];
let cur_scheme = available_listeners.last().unwrap().scheme().to_owned();
let cur_priority = cur_listener.priority;
let cur_scheme = cur_listener.url.scheme().to_owned();
while let Some(listener) = available_listeners.last() {
if listener.scheme() != cur_scheme {
if listener.priority != cur_priority || listener.url.scheme() != cur_scheme {
break;
}
tracing::debug!("try direct connect to peer with listener: {}", listener);
tracing::debug!(
%cur_priority,
"try direct connect to peer with listener: {}",
listener.url
);
self.spawn_direct_connect_task(dst_peer_id, &ip_list, listener, &mut tasks)
.await;
listener_list.push(listener.clone().to_string());
listener_list.push(listener.url.to_string());
available_listeners.pop();
}
@@ -606,12 +692,16 @@ impl DirectConnectorManagerData {
tracing::debug!(
?ret,
?dst_peer_id,
?cur_priority,
?cur_scheme,
?listener_list,
"all tasks finished for current scheme"
);
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
if self
.peer_manager
.has_directly_connected_conn_with_priority_at_most(dst_peer_id, cur_priority)
{
tracing::info!(
"direct connect to peer {} success, has direct conn",
dst_peer_id
@@ -666,13 +756,29 @@ impl DirectConnectorManagerData {
.await;
tracing::info!(?ret, ?dst_peer_id, "do_try_direct_connect return");
if peer_manager.has_directly_connected_conn(dst_peer_id) {
if peer_manager.has_directly_connected_conn_with_priority_at_most(
dst_peer_id,
DEFAULT_CONNECTION_PRIORITY,
) {
tracing::info!(
"direct connect to peer {} success, has direct conn",
dst_peer_id
);
return Ok(());
}
if peer_manager.has_directly_connected_conn(dst_peer_id) {
self.low_priority_direct_retry_backoff.insert(
dst_peer_id,
(),
Duration::from_secs(DIRECT_CONNECTOR_LOW_PRIORITY_RETRY_TIMEOUT_SEC),
);
tracing::info!(
"direct connect to peer {} skipped temporarily, only low-priority direct conn exists",
dst_peer_id
);
return Ok(());
}
}
}
}
@@ -715,6 +821,7 @@ impl PeerTaskLauncher for DirectConnectorLauncher {
async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec<Self::CollectPeerItem> {
data.peer_black_list.cleanup();
data.low_priority_direct_retry_backoff.cleanup();
let my_peer_id = data.peer_manager.my_peer_id();
data.peer_manager
.list_peers()
@@ -722,7 +829,13 @@ impl PeerTaskLauncher for DirectConnectorLauncher {
.into_iter()
.filter(|peer_id| {
*peer_id != my_peer_id
&& !data.peer_manager.has_directly_connected_conn(*peer_id)
&& !data
.peer_manager
.has_directly_connected_conn_with_priority_at_most(
*peer_id,
DEFAULT_CONNECTION_PRIORITY,
)
&& !data.low_priority_direct_retry_backoff.contains(peer_id)
&& !data.peer_black_list.contains(peer_id)
})
.collect()
@@ -813,12 +926,16 @@ mod tests {
wait_route_appear_with_cost,
},
proto::peer_rpc::GetIpListResponse,
proto::peer_rpc::ListenerInfo,
tunnel::{IpScheme, TunnelScheme, matches_scheme},
};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::{TESTING, mapped_listener_port, resolve_mapped_listener_addrs};
use super::{
DEFAULT_CONNECTION_PRIORITY, TESTING, available_listeners_from_ip_list,
mapped_listener_port, resolve_mapped_listener_addrs, sort_available_listeners,
};
#[tokio::test]
async fn public_ipv6_candidate_rejects_easytier_managed_addr_even_in_tests() {
@@ -868,6 +985,68 @@ mod tests {
);
}
#[test]
fn available_listener_order_uses_priority_before_protocol() {
let ip_list = GetIpListResponse {
listener_infos: vec![
ListenerInfo {
url: Some("tcp://127.0.0.1:11010".parse().unwrap()),
priority: 100,
},
ListenerInfo {
url: Some("udp://127.0.0.1:11011".parse().unwrap()),
priority: 0,
},
ListenerInfo {
url: Some("tcp://127.0.0.1:11012".parse().unwrap()),
priority: 0,
},
],
..Default::default()
};
let mut listeners = available_listeners_from_ip_list(&ip_list, true);
sort_available_listeners(&mut listeners, "tcp");
let ordered_urls = listeners
.iter()
.rev()
.map(|listener| listener.url.to_string())
.collect::<Vec<_>>();
assert_eq!(
ordered_urls,
vec![
"tcp://127.0.0.1:11012",
"udp://127.0.0.1:11011",
"tcp://127.0.0.1:11010",
]
);
}
#[test]
fn available_listener_order_keeps_legacy_listeners_at_default_priority() {
let mut ip_list = GetIpListResponse::default();
ip_list
.listeners
.push("udp://127.0.0.1:11010".parse().unwrap());
ip_list
.listeners
.push("tcp://127.0.0.1:11011".parse().unwrap());
let mut listeners = available_listeners_from_ip_list(&ip_list, true);
sort_available_listeners(&mut listeners, "tcp");
assert!(
listeners
.iter()
.all(|listener| listener.priority == DEFAULT_CONNECTION_PRIORITY)
);
assert_eq!(
listeners.last().unwrap().url.to_string(),
"tcp://127.0.0.1:11011"
);
}
#[tokio::test]
async fn resolve_mapped_listener_addrs_uses_default_ports() {
let wss_addrs = resolve_mapped_listener_addrs(&"wss://127.0.0.1".parse().unwrap())
+49 -20
View File
@@ -3,11 +3,13 @@ use std::{
sync::{Arc, Weak},
};
use dashmap::DashSet;
use dashmap::{DashMap, DashSet};
use tokio::{sync::mpsc, task::JoinSet, time::timeout};
use crate::{
common::{PeerId, dns::socket_addrs, join_joinset_background},
common::{
PeerId, config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, join_joinset_background,
},
peers::peer_conn::PeerConnId,
proto::{
api::instance::{
@@ -16,7 +18,7 @@ use crate::{
},
rpc_types::{self, controller::BaseController},
},
tunnel::{IpVersion, TunnelConnector},
tunnel::{IpVersion, PrioritizedConnector, TunnelConnector},
utils::weak_upgrade,
};
@@ -32,7 +34,7 @@ use crate::{
use super::create_connector_by_url;
type ConnectorMap = Arc<DashSet<url::Url>>;
type ConnectorMap = Arc<DashMap<url::Url, u32>>;
#[derive(Debug, Clone)]
struct ReconnResult {
@@ -43,7 +45,7 @@ struct ReconnResult {
struct ConnectorManagerData {
connectors: ConnectorMap,
reconnecting: DashSet<url::Url>,
reconnecting: DashMap<url::Url, u32>,
peer_manager: Weak<PeerManager>,
alive_conn_urls: Arc<DashSet<url::Url>>,
// user removed connector urls
@@ -60,14 +62,14 @@ pub struct ManualConnectorManager {
impl ManualConnectorManager {
pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Self {
let connectors = Arc::new(DashSet::new());
let connectors = Arc::new(DashMap::new());
let tasks = JoinSet::new();
let mut ret = Self {
global_ctx: global_ctx.clone(),
data: Arc::new(ConnectorManagerData {
connectors,
reconnecting: DashSet::new(),
reconnecting: DashMap::new(),
peer_manager: Arc::downgrade(&peer_manager),
alive_conn_urls: Arc::new(DashSet::new()),
removed_conn_urls: Arc::new(DashSet::new()),
@@ -85,14 +87,26 @@ impl ManualConnectorManager {
pub fn add_connector<T>(&self, connector: T)
where
T: TunnelConnector + 'static,
T: TunnelConnector,
{
tracing::info!("add_connector: {}", connector.remote_url());
self.data.connectors.insert(connector.remote_url());
let priority = connector.priority();
self.data
.connectors
.insert(connector.remote_url(), priority);
}
pub async fn add_connector_by_url(&self, url: url::Url) -> Result<(), Error> {
self.data.connectors.insert(url);
self.add_connector_by_url_with_priority(url, DEFAULT_CONNECTION_PRIORITY)
.await
}
pub async fn add_connector_by_url_with_priority(
&self,
url: url::Url,
priority: u32,
) -> Result<(), Error> {
self.data.connectors.insert(url, priority);
Ok(())
}
@@ -138,19 +152,25 @@ impl ManualConnectorManager {
Connector {
url: Some(conn_url.into()),
status: status.into(),
priority: *item.value(),
},
);
}
let reconnecting_urls: BTreeSet<url::Url> =
self.data.reconnecting.iter().map(|x| x.clone()).collect();
let reconnecting_urls: BTreeSet<_> = self
.data
.reconnecting
.iter()
.map(|item| (item.key().clone(), *item.value()))
.collect();
for conn_url in reconnecting_urls {
for (conn_url, priority) in reconnecting_urls {
ret.insert(
0,
Connector {
url: Some(conn_url.into()),
status: ConnectorStatus::Connecting.into(),
priority,
},
);
}
@@ -177,16 +197,20 @@ impl ManualConnectorManager {
for dead_url in dead_urls {
let data_clone = data.clone();
let sender = reconn_result_send.clone();
data.connectors.remove(&dead_url).unwrap();
let insert_succ = data.reconnecting.insert(dead_url.clone());
assert!(insert_succ);
let priority = data
.connectors
.remove(&dead_url)
.map(|(_, priority)| priority)
.unwrap_or(DEFAULT_CONNECTION_PRIORITY);
let previous = data.reconnecting.insert(dead_url.clone(), priority);
assert!(previous.is_none());
tasks.lock().unwrap().spawn(async move {
let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone() ).await;
let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), priority).await;
let _ = sender.send(reconn_ret).await;
data_clone.reconnecting.remove(&dead_url).unwrap();
data_clone.connectors.insert(dead_url.clone());
data_clone.connectors.insert(dead_url.clone(), priority);
});
}
tracing::info!("reconn_interval tick, done");
@@ -206,7 +230,7 @@ impl ManualConnectorManager {
if data.connectors.remove(url).is_some() {
tracing::warn!("connector: {}, removed", url);
continue;
} else if data.reconnecting.contains(url) {
} else if data.reconnecting.contains_key(url) {
tracing::warn!("connector: {}, reconnecting, remove later.", url);
remove_later.insert(url.clone());
continue;
@@ -244,6 +268,7 @@ impl ManualConnectorManager {
data: Arc<ConnectorManagerData>,
dead_url: String,
ip_version: IpVersion,
priority: u32,
) -> Result<ReconnResult, Error> {
let connector =
create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?;
@@ -257,7 +282,9 @@ impl ManualConnectorManager {
)));
};
let (peer_id, conn_id) = pm.try_direct_connect(connector).await?;
let (peer_id, conn_id) = pm
.try_direct_connect(PrioritizedConnector::new(connector, priority))
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
@@ -269,6 +296,7 @@ impl ManualConnectorManager {
async fn conn_reconnect(
data: Arc<ConnectorManagerData>,
dead_url: url::Url,
priority: u32,
) -> Result<ReconnResult, Error> {
tracing::info!("reconnect: {}", dead_url);
@@ -326,6 +354,7 @@ impl ManualConnectorManager {
data.clone(),
dead_url.to_string(),
ip_version,
priority,
),
)
.await;
+4 -1
View File
@@ -472,7 +472,10 @@ impl PeerTaskLauncher for TcpHolePunchPeerTaskLauncher {
continue;
}
if data.peer_mgr.get_peer_map().has_peer(peer_id) {
if data.peer_mgr.has_conn_with_priority_at_most(
peer_id,
crate::common::config::DEFAULT_CONNECTION_PRIORITY,
) {
tracing::trace!(peer_id, "tcp hole punch task collect skip already has peer");
continue;
}
+4 -1
View File
@@ -474,7 +474,10 @@ impl PeerTaskLauncher for UdpHolePunchPeerTaskLauncher {
continue;
}
if data.peer_mgr.get_peer_map().has_peer(peer_id) {
if data.peer_mgr.has_conn_with_priority_at_most(
peer_id,
crate::common::config::DEFAULT_CONNECTION_PRIORITY,
) {
continue;
}
+2
View File
@@ -923,6 +923,7 @@ impl NetworkOptions {
.parse()
.with_context(|| format!("failed to parse peer uri: {}", p))?,
peer_public_key: None,
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
});
}
cfg.set_peers(peers);
@@ -960,6 +961,7 @@ impl NetworkOptions {
format!("failed to parse external node uri: {}", external_nodes)
})?,
peer_public_key: None,
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
});
cfg.set_peers(old_peers);
}
+67 -14
View File
@@ -43,7 +43,7 @@ use easytier::{
},
instance::{
AclManageRpc, AclManageRpcClientFactory, Connector, ConnectorManageRpc,
ConnectorManageRpcClientFactory, CredentialManageRpc,
ConnectorManageRpcClientFactory, ConnectorStatus, CredentialManageRpc,
CredentialManageRpcClientFactory, DumpRouteRequest, ForeignNetworkEntryPb,
GenerateCredentialRequest, GetAclStatsRequest, GetPrometheusStatsRequest,
GetStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest,
@@ -236,6 +236,8 @@ enum ConnectorSubCommand {
Add {
#[arg(help = "connector url, e.g., tcp://1.2.3.4:11010")]
url: String,
#[arg(short = 'p', long = "priority", default_value_t = easytier::common::config::DEFAULT_CONNECTION_PRIORITY, help = "connection priority; lower values are preferred")]
priority: u32,
},
/// Remove a connector
Remove {
@@ -254,7 +256,11 @@ struct MappedListenerArgs {
#[derive(Subcommand, Debug)]
enum MappedListenerSubCommand {
/// Add Mapped Listerner
Add { url: String },
Add {
url: String,
#[arg(short = 'p', long = "priority", default_value_t = easytier::common::config::DEFAULT_CONNECTION_PRIORITY, help = "listener priority; lower values are preferred")]
priority: u32,
},
/// Remove Mapped Listener
Remove { url: String },
/// List Existing Mapped Listener
@@ -1247,6 +1253,7 @@ impl<'a> CommandHandler<'a> {
&self,
url: &str,
action: ConfigPatchAction,
priority: Option<u32>,
) -> Result<(), Error> {
let url = match action {
ConfigPatchAction::Add => Self::connector_validate_url(url)?,
@@ -1267,6 +1274,7 @@ impl<'a> CommandHandler<'a> {
connectors: vec![UrlPatch {
action: action.into(),
url: Some(url.into()),
priority,
}],
..Default::default()
}),
@@ -1281,11 +1289,12 @@ impl<'a> CommandHandler<'a> {
&self,
url: &str,
action: ConfigPatchAction,
priority: Option<u32>,
) -> Result<(), Error> {
let url = url.to_string();
self.apply_to_instances(|handler| {
let url = url.clone();
Box::pin(async move { handler.apply_connector_modify(&url, action).await })
Box::pin(async move { handler.apply_connector_modify(&url, action, priority).await })
})
.await
}
@@ -1309,6 +1318,8 @@ impl<'a> CommandHandler<'a> {
tx_bytes: String,
#[tabled(rename = "tunnel")]
tunnel_proto: String,
#[tabled(rename = "prio")]
priority: String,
#[tabled(rename = "NAT")]
nat_type: String,
#[tabled(skip)]
@@ -1338,6 +1349,10 @@ impl<'a> CommandHandler<'a> {
rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL),
tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL),
tunnel_proto: p.get_conn_protos().unwrap_or_default().join(","),
priority: p
.get_conn_priority()
.map(|priority| priority.to_string())
.unwrap_or_else(|| "-".to_string()),
nat_type: p.get_udp_nat_type(),
id: route.peer_id.to_string(),
version: if route.version.is_empty() {
@@ -1363,6 +1378,7 @@ impl<'a> CommandHandler<'a> {
rx_bytes: "-".to_string(),
tx_bytes: "-".to_string(),
tunnel_proto: "-".to_string(),
priority: "-".to_string(),
nat_type: if let Some(info) = p.stun_info {
info.udp_nat_type().as_str_name().to_string()
} else {
@@ -1826,6 +1842,29 @@ impl<'a> CommandHandler<'a> {
}
async fn handle_connector_list(&self) -> Result<(), Error> {
#[derive(tabled::Tabled, serde::Serialize)]
struct ConnectorTableItem {
url: String,
status: String,
priority: String,
}
impl From<Connector> for ConnectorTableItem {
fn from(connector: Connector) -> Self {
Self {
url: connector
.url
.map(Into::<url::Url>::into)
.map(|url| url.to_string())
.unwrap_or_default(),
status: ConnectorStatus::try_from(connector.status)
.map(|status| format!("{:?}", status))
.unwrap_or_else(|_| connector.status.to_string()),
priority: connector.priority.to_string(),
}
}
}
let results = self
.collect_instance_results(|handler| Box::pin(handler.fetch_connector_list()))
.await?;
@@ -1833,8 +1872,13 @@ impl<'a> CommandHandler<'a> {
return self.print_json_results(results);
}
self.print_results(&results, |connectors| {
println!("response: {:#?}", connectors);
Ok(())
let mut items = connectors
.iter()
.cloned()
.map(ConnectorTableItem::from)
.collect::<Vec<_>>();
items.sort_by(|a, b| a.url.cmp(&b.url));
print_output(&items, self.output_format, &[], &[], self.no_trunc)
})
}
@@ -1873,6 +1917,7 @@ impl<'a> CommandHandler<'a> {
&self,
url: &str,
action: ConfigPatchAction,
priority: Option<u32>,
) -> Result<(), Error> {
let url = Self::mapped_listener_validate_url(url)?;
let client = self.get_config_client().await?;
@@ -1882,6 +1927,7 @@ impl<'a> CommandHandler<'a> {
mapped_listeners: vec![UrlPatch {
action: action.into(),
url: Some(url.into()),
priority,
}],
..Default::default()
}),
@@ -1896,11 +1942,16 @@ impl<'a> CommandHandler<'a> {
&self,
url: &str,
action: ConfigPatchAction,
priority: Option<u32>,
) -> Result<(), Error> {
let url = url.to_string();
self.apply_to_instances(|handler| {
let url = url.clone();
Box::pin(async move { handler.apply_mapped_listener_modify(&url, action).await })
Box::pin(async move {
handler
.apply_mapped_listener_modify(&url, action, priority)
.await
})
})
.await
}
@@ -2883,15 +2934,17 @@ async fn main() -> Result<(), Error> {
}
},
SubCommand::Connector(conn_args) => match conn_args.sub_command {
Some(ConnectorSubCommand::Add { url }) => {
Some(ConnectorSubCommand::Add { url, priority }) => {
handler
.handle_connector_modify(&url, ConfigPatchAction::Add)
.handle_connector_modify(&url, ConfigPatchAction::Add, Some(priority))
.await?;
println!("connector add applied to selected instance(s): {url}");
println!(
"connector add applied to selected instance(s): {url}, priority: {priority}"
);
}
Some(ConnectorSubCommand::Remove { url }) => {
handler
.handle_connector_modify(&url, ConfigPatchAction::Remove)
.handle_connector_modify(&url, ConfigPatchAction::Remove, None)
.await?;
println!("connector remove applied to selected instance(s): {url}");
}
@@ -2904,15 +2957,15 @@ async fn main() -> Result<(), Error> {
},
SubCommand::MappedListener(mapped_listener_args) => {
match mapped_listener_args.sub_command {
Some(MappedListenerSubCommand::Add { url }) => {
Some(MappedListenerSubCommand::Add { url, priority }) => {
handler
.handle_mapped_listener_modify(&url, ConfigPatchAction::Add)
.handle_mapped_listener_modify(&url, ConfigPatchAction::Add, Some(priority))
.await?;
println!("add mapped listener: {url}");
println!("add mapped listener: {url}, priority: {priority}");
}
Some(MappedListenerSubCommand::Remove { url }) => {
handler
.handle_mapped_listener_modify(&url, ConfigPatchAction::Remove)
.handle_mapped_listener_modify(&url, ConfigPatchAction::Remove, None)
.await?;
println!("remove mapped listener: {url}");
}
+40 -9
View File
@@ -560,16 +560,39 @@ impl InstanceConfigPatcher {
return Ok(());
}
let global_ctx = weak_upgrade(&self.global_ctx)?;
let mut current_mapped_listeners = global_ctx.config.get_mapped_listeners();
let current_mapped_listener_configs = global_ctx.config.get_mapped_listener_configs();
let mut priority_by_url = current_mapped_listener_configs
.iter()
.map(|listener| (listener.url.clone(), listener.priority))
.collect::<std::collections::HashMap<_, _>>();
let mut current_mapped_listeners = current_mapped_listener_configs
.into_iter()
.map(|listener| listener.url)
.collect();
for patch in &mapped_listeners {
if let (Some(url), Some(priority)) = (&patch.url, patch.priority) {
priority_by_url.insert(url.clone().into(), priority);
}
}
let patches = mapped_listeners.into_iter().map(Into::into).collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::api::config::patch_vec(&mut current_mapped_listeners, patches);
if current_mapped_listeners.is_empty() {
global_ctx.config.set_mapped_listeners(None);
global_ctx.config.set_mapped_listener_configs(None);
} else {
let mapped_listener_configs = current_mapped_listeners
.into_iter()
.map(|url| {
let priority = priority_by_url
.get(&url)
.copied()
.unwrap_or(crate::common::config::DEFAULT_CONNECTION_PRIORITY);
crate::common::config::ListenerConfig::new(url, priority)
})
.collect();
global_ctx
.config
.set_mapped_listeners(Some(current_mapped_listeners));
.set_mapped_listener_configs(Some(mapped_listener_configs));
}
Ok(())
}
@@ -590,7 +613,14 @@ impl InstanceConfigPatcher {
match ConfigPatchAction::try_from(connector.action) {
Ok(ConfigPatchAction::Add) => {
tracing::info!("Connector added: {}", url);
conn_manager.add_connector_by_url(url).await?;
conn_manager
.add_connector_by_url_with_priority(
url,
connector
.priority
.unwrap_or(crate::common::config::DEFAULT_CONNECTION_PRIORITY),
)
.await?;
}
Ok(ConfigPatchAction::Remove) => {
tracing::info!("Connector removed: {}", url);
@@ -742,7 +772,7 @@ impl Instance {
async fn add_initial_peers(&self) -> Result<(), Error> {
for peer in self.global_ctx.config.get_peers().iter() {
self.get_conn_manager()
.add_connector_by_url(peer.uri.clone())
.add_connector_by_url_with_priority(peer.uri.clone(), peer.priority)
.await?;
}
Ok(())
@@ -1228,11 +1258,12 @@ impl Instance {
_request: ListMappedListenerRequest,
) -> Result<ListMappedListenerResponse, rpc_types::error::Error> {
let mut ret = ListMappedListenerResponse::default();
let urls = weak_upgrade(&self.0)?.config.get_mapped_listeners();
let mapped_listeners: Vec<MappedListener> = urls
let listener_configs = weak_upgrade(&self.0)?.config.get_mapped_listener_configs();
let mapped_listeners: Vec<MappedListener> = listener_configs
.into_iter()
.map(|u| MappedListener {
url: Some(u.into()),
.map(|listener| MappedListener {
url: Some(listener.url.into()),
priority: listener.priority,
})
.collect();
ret.mappedlisteners = mapped_listeners;
+26 -5
View File
@@ -91,6 +91,7 @@ pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
struct ListenerFactory {
creator_fn: Arc<ListenerCreator>,
must_succ: bool,
priority: u32,
}
pub struct ListenerManager<H> {
@@ -125,8 +126,9 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
)
.await?;
for l in self.global_ctx.config.get_listener_uris().iter() {
let l = l.clone();
for listener_cfg in self.global_ctx.config.get_listener_configs().iter() {
let l = listener_cfg.url.clone();
let priority = listener_cfg.priority;
let Ok(_) = create_listener_by_url(&l, self.global_ctx.clone()) else {
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
self.global_ctx
@@ -136,9 +138,10 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
let ctx = self.global_ctx.clone();
let listener = l.clone();
self.add_listener(
self.add_listener_with_priority(
move || create_listener_by_url(&listener, ctx.clone()).unwrap(),
true,
priority,
)
.await?;
@@ -153,9 +156,10 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
.set_host(Some("[::]".to_string().as_str()))
.with_context(|| format!("failed to set ipv6 host for listener: {}", l))?;
let ctx = self.global_ctx.clone();
self.add_listener(
self.add_listener_with_priority(
move || create_listener_by_url(&ipv6_listener, ctx.clone()).unwrap(),
false,
priority,
)
.await?;
}
@@ -168,10 +172,25 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
&mut self,
creator: C,
must_succ: bool,
) -> Result<(), Error> {
self.add_listener_with_priority(
creator,
must_succ,
crate::common::config::DEFAULT_CONNECTION_PRIORITY,
)
.await
}
pub async fn add_listener_with_priority<C: ListenerCreatorTrait + 'static>(
&mut self,
creator: C,
must_succ: bool,
priority: u32,
) -> Result<(), Error> {
self.listeners.push(ListenerFactory {
creator_fn: Arc::new(Box::new(creator)),
must_succ,
priority,
});
Ok(())
}
@@ -181,6 +200,7 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
creator: Arc<ListenerCreator>,
peer_manager: Weak<H>,
global_ctx: ArcGlobalCtx,
priority: u32,
) {
let mut err_count = 0;
loop {
@@ -189,7 +209,7 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
match l.listen().await {
Ok(_) => {
err_count = 0;
global_ctx.add_running_listener(l.local_url());
global_ctx.add_running_listener_with_priority(l.local_url(), priority);
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
}
Err(e) => {
@@ -270,6 +290,7 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
listener.creator_fn.clone(),
self.peer_manager.clone(),
self.global_ctx.clone(),
listener.priority,
));
}
+3
View File
@@ -551,6 +551,7 @@ impl NetworkConfig {
format!("failed to parse public server uri: {}", public_server_url)
})?,
peer_public_key: None,
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
}]);
}
NetworkingMethod::Manual => {
@@ -564,6 +565,7 @@ impl NetworkConfig {
.parse()
.with_context(|| format!("failed to parse peer uri: {}", peer_url))?,
peer_public_key: None,
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
});
}
if !peers.is_empty() {
@@ -1109,6 +1111,7 @@ mod tests {
peers.push(crate::common::config::PeerConfig {
uri,
peer_public_key: None,
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
});
}
config.set_peers(peers);
@@ -279,7 +279,7 @@ impl ForeignNetworkEntry {
flags.disable_relay_quic = !global_ctx.get_flags().enable_relay_foreign_network_quic;
config.set_flags(flags);
config.set_mapped_listeners(Some(global_ctx.config.get_mapped_listeners()));
config.set_mapped_listener_configs(Some(global_ctx.config.get_mapped_listener_configs()));
let foreign_global_ctx = Arc::new(GlobalCtx::new(config));
foreign_global_ctx
@@ -291,8 +291,8 @@ impl ForeignNetworkEntry {
Self::desired_avoid_relay_data_feature_flag(&global_ctx, relay_data);
foreign_global_ctx.set_base_advertised_feature_flags(feature_flag);
for u in global_ctx.get_running_listeners().into_iter() {
foreign_global_ctx.add_running_listener(u);
for listener in global_ctx.get_running_listener_configs().into_iter() {
foreign_global_ctx.add_running_listener_with_priority(listener.url, listener.priority);
}
foreign_global_ctx
+124 -14
View File
@@ -188,23 +188,37 @@ impl Peer {
async fn select_conn(&self) -> Option<ArcPeerConn> {
let default_conn_id = self.default_conn_id.load();
if let Some(conn) = self.conns.get(&default_conn_id) {
return Some(conn.clone());
}
let default_conn = self.conns.get(&default_conn_id).and_then(|conn| {
if conn.is_closed() {
None
} else {
Some((conn.priority(), conn.clone()))
}
});
// find a conn with the smallest latency
let mut min_latency = u64::MAX;
for conn in self.conns.iter() {
let latency = conn.value().get_stats().latency_us;
if latency < min_latency {
min_latency = latency;
self.default_conn_id.store(conn.get_conn_id());
if let Some((default_priority, default_conn)) = default_conn {
let has_better_conn = self.conns.iter().any(|conn| {
!conn.value().is_closed() && conn.value().priority() < default_priority
});
if !has_better_conn {
return Some(default_conn);
}
}
self.conns
.get(&self.default_conn_id.load())
.map(|conn| conn.clone())
// Prefer lower listener priority first, then use latency to pick the
// initial connection within that priority class. Keep the selected conn
// sticky until a strictly better priority appears.
let selected_conn_id = self
.conns
.iter()
.filter(|conn| !conn.value().is_closed())
.min_by_key(|conn| (conn.value().priority(), conn.value().get_stats().latency_us))
.map(|conn| conn.get_conn_id());
selected_conn_id.and_then(|conn_id| {
self.default_conn_id.store(conn_id);
self.conns.get(&conn_id).map(|conn| conn.clone())
})
}
pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> {
@@ -249,12 +263,26 @@ impl Peer {
self.conns.iter().any(|entry| !entry.value().is_closed())
}
pub fn has_conn_with_priority_at_most(&self, priority: u32) -> bool {
self.conns
.iter()
.any(|entry| !entry.value().is_closed() && entry.value().priority() <= priority)
}
pub fn has_directly_connected_conn(&self) -> bool {
self.conns
.iter()
.any(|entry| !entry.value().is_closed() && !entry.value().is_hole_punched())
}
pub fn has_directly_connected_conn_with_priority_at_most(&self, priority: u32) -> bool {
self.conns.iter().any(|entry| {
!entry.value().is_closed()
&& !entry.value().is_hole_punched()
&& entry.value().priority() <= priority
})
}
pub fn get_directly_connections(&self) -> DashSet<uuid::Uuid> {
self.conns
.iter()
@@ -298,7 +326,7 @@ mod tests {
use crate::{
common::{
config::{NetworkIdentity, PeerConfig},
config::{DEFAULT_CONNECTION_PRIORITY, NetworkIdentity, PeerConfig},
global_ctx::{GlobalCtx, tests::get_mock_global_ctx},
new_peer_id,
},
@@ -380,6 +408,87 @@ mod tests {
close_handler.await.unwrap().unwrap();
}
#[tokio::test]
async fn select_conn_prefers_lower_priority_before_latency() {
let (packet_send, _packet_recv) = create_packet_recv_chan();
let global_ctx = get_mock_global_ctx();
let local_peer_id = new_peer_id();
let remote_peer_id = new_peer_id();
let peer = Peer::new(remote_peer_id, packet_send, global_ctx.clone());
let ps = Arc::new(PeerSessionStore::new());
let (low_client_tunnel, low_server_tunnel) = create_ring_tunnel_pair();
let mut low_client_conn = PeerConn::new(
local_peer_id,
global_ctx.clone(),
low_client_tunnel,
ps.clone(),
);
low_client_conn.set_priority(100);
let low_conn_id = low_client_conn.get_conn_id();
let mut low_server_conn = PeerConn::new(
remote_peer_id,
global_ctx.clone(),
low_server_tunnel,
ps.clone(),
);
let (client_ret, server_ret) = tokio::join!(
low_client_conn.do_handshake_as_client(),
low_server_conn.do_handshake_as_server()
);
client_ret.unwrap();
server_ret.unwrap();
peer.add_peer_conn(low_client_conn).await.unwrap();
assert_eq!(peer.select_conn().await.unwrap().get_conn_id(), low_conn_id);
let (same_priority_client_tunnel, same_priority_server_tunnel) = create_ring_tunnel_pair();
let mut same_priority_client_conn = PeerConn::new(
local_peer_id,
global_ctx.clone(),
same_priority_client_tunnel,
ps.clone(),
);
same_priority_client_conn.set_priority(100);
let mut same_priority_server_conn = PeerConn::new(
remote_peer_id,
global_ctx.clone(),
same_priority_server_tunnel,
ps.clone(),
);
let (client_ret, server_ret) = tokio::join!(
same_priority_client_conn.do_handshake_as_client(),
same_priority_server_conn.do_handshake_as_server()
);
client_ret.unwrap();
server_ret.unwrap();
peer.add_peer_conn(same_priority_client_conn).await.unwrap();
assert_eq!(peer.select_conn().await.unwrap().get_conn_id(), low_conn_id);
let (high_client_tunnel, high_server_tunnel) = create_ring_tunnel_pair();
let mut high_client_conn = PeerConn::new(
local_peer_id,
global_ctx.clone(),
high_client_tunnel,
ps.clone(),
);
high_client_conn.set_priority(DEFAULT_CONNECTION_PRIORITY);
let high_conn_id = high_client_conn.get_conn_id();
let mut high_server_conn =
PeerConn::new(remote_peer_id, global_ctx, high_server_tunnel, ps);
let (client_ret, server_ret) = tokio::join!(
high_client_conn.do_handshake_as_client(),
high_server_conn.do_handshake_as_server()
);
client_ret.unwrap();
server_ret.unwrap();
peer.add_peer_conn(high_client_conn).await.unwrap();
assert_eq!(
peer.select_conn().await.unwrap().get_conn_id(),
high_conn_id
);
}
#[tokio::test]
async fn reject_peer_conn_with_mismatched_identity_type() {
let (packet_send, _packet_recv) = create_packet_recv_chan();
@@ -423,6 +532,7 @@ mod tests {
.local_public_key
.unwrap(),
),
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
}]);
let mut shared_client_conn = PeerConn::new(
local_peer_id,
+18 -1
View File
@@ -37,7 +37,7 @@ use crate::utils::BoxExt;
use crate::{
common::{
PeerId,
config::{NetworkIdentity, NetworkSecretDigest},
config::{DEFAULT_CONNECTION_PRIORITY, NetworkIdentity, NetworkSecretDigest},
error::Error,
global_ctx::ArcGlobalCtx,
},
@@ -305,6 +305,7 @@ pub struct PeerConn {
// remote or local
is_hole_punched: bool,
priority: u32,
close_event_notifier: Arc<PeerConnCloseNotify>,
@@ -393,6 +394,7 @@ impl PeerConn {
is_client: None,
is_hole_punched: true,
priority: DEFAULT_CONNECTION_PRIORITY,
close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)),
@@ -442,6 +444,14 @@ impl PeerConn {
self.is_hole_punched
}
pub fn set_priority(&mut self, priority: u32) {
self.priority = priority;
}
pub fn priority(&self) -> u32 {
self.priority
}
pub fn is_closed(&self) -> bool {
self.close_event_notifier.is_closed()
}
@@ -529,6 +539,7 @@ impl PeerConn {
version: VERSION,
features: Vec::new(),
network_name: network.network_name.clone(),
connection_priority: self.priority,
..Default::default()
};
@@ -821,6 +832,7 @@ impl PeerConn {
a_session_generation,
a_conn_id: Some(a_conn_id.into()),
client_encryption_algorithm: self.my_encrypt_algo.clone(),
connection_priority: self.priority,
};
let mut hs = builder
@@ -1072,6 +1084,7 @@ impl PeerConn {
Some(&mut hs),
first_msg1,
)?;
self.priority = msg1_pb.connection_priority;
let remote_network_name = msg1_pb.a_network_name.clone();
self.record_control_rx(&remote_network_name, first_msg1_len);
@@ -1227,6 +1240,7 @@ impl PeerConn {
features: Vec::new(),
network_secret_digest: noise.secret_digest.clone(),
connection_priority: self.priority,
}
}
@@ -1264,6 +1278,7 @@ impl PeerConn {
self.is_client = Some(false);
} else if hdr.packet_type == PacketType::HandShake as u8 {
let rsp = Self::decode_handshake_packet(&first_pkt)?;
self.priority = rsp.connection_priority;
handshake_recved(self, &rsp.network_name)?;
tracing::info!("handshake request: {:?}", rsp);
self.record_control_rx(&rsp.network_name, first_pkt.buf_len() as u64);
@@ -1566,6 +1581,7 @@ impl PeerConn {
.as_ref()
.map(|x| x.peer_identity_type as i32)
.unwrap_or(PeerIdentityType::Admin as i32),
priority: self.priority,
}
}
@@ -2088,6 +2104,7 @@ pub mod tests {
.local_public_key
.unwrap(),
),
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
}]);
let ps = Arc::new(PeerSessionStore::new());
+44 -1
View File
@@ -22,6 +22,7 @@ use crate::{
common::{
PeerId,
compressor::{Compressor as _, DefaultCompressor},
config::DEFAULT_CONNECTION_PRIORITY,
constants::EASYTIER_VERSION,
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
@@ -31,6 +32,7 @@ use crate::{
},
peers::{
PeerPacketFilter,
peer::Peer,
peer_conn::PeerConn,
peer_rpc::PeerRpcManagerTransport,
peer_session::PeerSessionStore,
@@ -594,6 +596,22 @@ impl PeerManager {
tunnel: Box<dyn Tunnel>,
is_directly_connected: bool,
peer_id_hint: Option<PeerId>,
) -> Result<(PeerId, PeerConnId), Error> {
self.add_client_tunnel_with_peer_id_hint_and_priority(
tunnel,
is_directly_connected,
peer_id_hint,
DEFAULT_CONNECTION_PRIORITY,
)
.await
}
pub(crate) async fn add_client_tunnel_with_peer_id_hint_and_priority(
&self,
tunnel: Box<dyn Tunnel>,
is_directly_connected: bool,
peer_id_hint: Option<PeerId>,
priority: u32,
) -> Result<(PeerId, PeerConnId), Error> {
let mut peer = PeerConn::new_with_peer_id_hint(
self.my_peer_id,
@@ -602,6 +620,7 @@ impl PeerManager {
peer_id_hint,
self.peer_session_store.clone(),
);
peer.set_priority(priority);
peer.set_is_hole_punched(!is_directly_connected);
peer.do_handshake_as_client().await?;
let conn_id = peer.get_conn_id();
@@ -616,6 +635,14 @@ impl PeerManager {
Ok((peer_id, conn_id))
}
fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> {
self.peers.get_peer_by_id(peer_id).or_else(|| {
self.foreign_network_client
.get_peer_map()
.get_peer_by_id(peer_id)
})
}
pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool {
if let Some(peer) = self.peers.get_peer_by_id(peer_id) {
peer.has_directly_connected_conn()
@@ -624,6 +651,20 @@ impl PeerManager {
}
}
pub(crate) fn has_directly_connected_conn_with_priority_at_most(
&self,
peer_id: PeerId,
priority: u32,
) -> bool {
self.get_peer_by_id(peer_id)
.is_some_and(|peer| peer.has_directly_connected_conn_with_priority_at_most(priority))
}
pub(crate) fn has_conn_with_priority_at_most(&self, peer_id: PeerId, priority: u32) -> bool {
self.get_peer_by_id(peer_id)
.is_some_and(|peer| peer.has_conn_with_priority_at_most(priority))
}
#[tracing::instrument]
pub async fn try_direct_connect<C>(&self, connector: C) -> Result<(PeerId, PeerConnId), Error>
where
@@ -642,11 +683,12 @@ impl PeerManager {
where
C: TunnelConnector + Debug,
{
let priority = connector.priority();
let ns = self.global_ctx.net_ns.clone();
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint)
self.add_client_tunnel_with_peer_id_hint_and_priority(t, true, peer_id_hint, priority)
.await
}
@@ -3035,6 +3077,7 @@ mod tests {
crate::common::config::PeerConfig {
uri: server_remote_url,
peer_public_key: Some(server_pub_b64.clone()),
priority: crate::common::config::DEFAULT_CONNECTION_PRIORITY,
},
]);
+16 -5
View File
@@ -5,7 +5,8 @@ use crate::{
proto::{
common::Void,
peer_rpc::{
DirectConnectorRpc, GetIpListRequest, GetIpListResponse, SendUdpHolePunchPacketRequest,
DirectConnectorRpc, GetIpListRequest, GetIpListResponse, ListenerInfo,
SendUdpHolePunchPacketRequest,
},
rpc_types::{self, controller::BaseController},
},
@@ -44,13 +45,23 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
_: GetIpListRequest,
) -> rpc_types::error::Result<GetIpListResponse> {
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
ret.listeners = self
let listener_configs = self
.global_ctx
.config
.get_mapped_listeners()
.get_mapped_listener_configs()
.into_iter()
.chain(self.global_ctx.get_running_listeners())
.map(Into::into)
.chain(self.global_ctx.get_running_listener_configs())
.collect::<Vec<_>>();
ret.listeners = listener_configs
.iter()
.map(|listener| listener.url.clone().into())
.collect();
ret.listener_infos = listener_configs
.into_iter()
.map(|listener| ListenerInfo {
url: Some(listener.url.into()),
priority: listener.priority,
})
.collect();
remove_easytier_managed_ipv6s(&mut ret, &self.global_ctx);
tracing::trace!(
+15
View File
@@ -141,6 +141,21 @@ pub mod instance {
ret
}
pub fn get_conn_priority(&self) -> Option<u32> {
let p = self.peer.as_ref()?;
let default_conn_id = p.default_conn_id.map(|id| id.to_string());
let mut ret = None;
for conn in p.conns.iter() {
if default_conn_id == Some(conn.conn_id.to_string()) {
return Some(conn.priority);
}
ret.get_or_insert(conn.priority);
}
ret
}
fn get_tunnel_proto_str(tunnel_info: &super::super::common::TunnelInfo) -> String {
tunnel_info.display_tunnel_type()
}
+1
View File
@@ -43,6 +43,7 @@ message StringPatch {
message UrlPatch {
ConfigPatchAction action = 1;
common.Url url = 2;
optional uint32 priority = 3;
}
message AclPatch {
+6 -1
View File
@@ -45,6 +45,7 @@ message PeerConnInfo {
bytes noise_remote_static_pubkey = 12;
peer_rpc.SecureAuthLevel secure_auth_level = 13;
peer_rpc.PeerIdentityType peer_identity_type = 14;
uint32 priority = 15;
}
message PeerInfo {
@@ -208,6 +209,7 @@ enum ConnectorStatus {
message Connector {
common.Url url = 1;
ConnectorStatus status = 2;
uint32 priority = 3;
}
message ListConnectorRequest { InstanceIdentifier instance = 1; }
@@ -218,7 +220,10 @@ service ConnectorManageRpc {
rpc ListConnector(ListConnectorRequest) returns (ListConnectorResponse);
}
message MappedListener { common.Url url = 1; }
message MappedListener {
common.Url url = 1;
uint32 priority = 2;
}
message ListMappedListenerRequest { InstanceIdentifier instance = 1; }
+8
View File
@@ -184,6 +184,12 @@ message GetIpListResponse {
common.Ipv6Addr public_ipv6 = 3;
repeated common.Ipv6Addr interface_ipv6s = 4;
repeated common.Url listeners = 5;
repeated ListenerInfo listener_infos = 6;
}
message ListenerInfo {
common.Url url = 1;
uint32 priority = 2;
}
message SendUdpHolePunchPacketRequest {
@@ -314,6 +320,7 @@ message HandshakeRequest {
repeated string features = 4;
string network_name = 5;
bytes network_secret_digest = 6;
uint32 connection_priority = 7;
}
message KcpConnData {
@@ -346,6 +353,7 @@ message PeerConnNoiseMsg1Pb {
optional uint32 a_session_generation = 3;
common.UUID a_conn_id = 4;
string client_encryption_algorithm = 5;
uint32 connection_priority = 6;
}
message PeerConnNoiseMsg2Pb {
+43 -1
View File
@@ -3,7 +3,7 @@ use std::{
};
use crate::{
common::{dns::socket_addrs, error::Error},
common::{config::DEFAULT_CONNECTION_PRIORITY, dns::socket_addrs, error::Error},
proto::common::TunnelInfo,
};
use async_trait::async_trait;
@@ -139,11 +139,53 @@ pub trait TunnelListener: Send {
pub trait TunnelConnector: Send {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError>;
fn remote_url(&self) -> url::Url;
fn priority(&self) -> u32 {
DEFAULT_CONNECTION_PRIORITY
}
fn set_bind_addrs(&mut self, _addrs: Vec<SocketAddr>) {}
fn set_ip_version(&mut self, _ip_version: IpVersion) {}
fn set_resolved_addr(&mut self, _addr: SocketAddr) {}
}
#[derive(Debug)]
pub struct PrioritizedConnector<C> {
inner: C,
priority: u32,
}
impl<C> PrioritizedConnector<C> {
pub fn new(inner: C, priority: u32) -> Self {
Self { inner, priority }
}
}
#[async_trait]
impl<C: TunnelConnector> TunnelConnector for PrioritizedConnector<C> {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
self.inner.connect().await
}
fn remote_url(&self) -> url::Url {
self.inner.remote_url()
}
fn priority(&self) -> u32 {
self.priority
}
fn set_bind_addrs(&mut self, addrs: Vec<SocketAddr>) {
self.inner.set_bind_addrs(addrs);
}
fn set_ip_version(&mut self, ip_version: IpVersion) {
self.inner.set_ip_version(ip_version);
}
fn set_resolved_addr(&mut self, addr: SocketAddr) {
self.inner.set_resolved_addr(addr);
}
}
pub fn build_url_from_socket_addr(addr: &String, scheme: &str) -> url::Url {
if let Ok(sock_addr) = addr.parse::<SocketAddr>() {
let url_str = format!("{}://0.0.0.0", scheme);