From 1b48029bdc3f51240e6315bf15e421412e4a27ad Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Fri, 1 May 2026 23:30:51 +0800 Subject: [PATCH] fix: clean stale foreign network state (#2197) - clear foreign-network traffic metric peer caches on peer removal and network cleanup - release reserved foreign-network peer IDs on handshake/add-peer error paths - avoid creating no-op foreign-network token buckets when limits are unlimited - shrink relay/session maps after cleanup and remove unused peer-center global data entries --- easytier/src/peer_center/server.rs | 56 +++++++++++-- easytier/src/peers/foreign_network_manager.rs | 83 ++++++++++++++++--- easytier/src/peers/peer_conn.rs | 4 +- easytier/src/peers/peer_manager.rs | 33 ++++++-- easytier/src/peers/peer_session.rs | 6 +- easytier/src/peers/relay_peer_map.rs | 10 ++- easytier/src/peers/traffic_metrics.rs | 13 +++ 7 files changed, 176 insertions(+), 29 deletions(-) diff --git a/easytier/src/peer_center/server.rs b/easytier/src/peer_center/server.rs index 9949ddb2..5a166406 100644 --- a/easytier/src/peer_center/server.rs +++ b/easytier/src/peer_center/server.rs @@ -35,7 +35,7 @@ pub(crate) struct PeerCenterInfoEntry { update_time: std::time::Instant, } -#[derive(Default)] +#[derive(Debug, Default)] pub(crate) struct PeerCenterServerGlobalData { pub(crate) global_peer_map: DashMap, pub(crate) peer_report_time: DashMap, @@ -58,27 +58,33 @@ pub(crate) fn get_global_data(node_id: PeerId) -> Arc, tasks: Arc>, } impl PeerCenterServer { pub fn new(my_node_id: PeerId) -> Self { + let data = get_global_data(my_node_id); + let weak_data = Arc::downgrade(&data); let mut tasks = JoinSet::new(); tasks.spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_secs(10)).await; - PeerCenterServer::clean_outdated_peer(my_node_id).await; + let Some(data) = weak_data.upgrade() else { + break; + }; + PeerCenterServer::clean_outdated_peer_data(&data).await; } }); PeerCenterServer { my_node_id, + data, tasks: Arc::new(tasks), } } - async fn clean_outdated_peer(my_node_id: PeerId) { - let data = get_global_data(my_node_id); + async fn clean_outdated_peer_data(data: &PeerCenterServerGlobalData) { data.peer_report_time.retain(|_, v| { std::time::Instant::now().duration_since(*v) < std::time::Duration::from_secs(180) }); @@ -88,8 +94,7 @@ impl PeerCenterServer { }); } - fn calc_global_digest(my_node_id: PeerId) -> Digest { - let data = get_global_data(my_node_id); + fn calc_global_digest_data(data: &PeerCenterServerGlobalData) -> Digest { let mut hasher = std::collections::hash_map::DefaultHasher::new(); data.global_peer_map .iter() @@ -102,6 +107,18 @@ impl PeerCenterServer { } } +impl Drop for PeerCenterServer { + fn drop(&mut self) { + if Arc::strong_count(&self.tasks) != 1 { + return; + } + + GLOBAL_DATA.remove_if(&self.my_node_id, |_, data| { + Arc::ptr_eq(data, &self.data) && Arc::strong_count(data) <= 2 + }); + } +} + #[async_trait::async_trait] impl PeerCenterRpc for PeerCenterServer { type Controller = BaseController; @@ -117,7 +134,7 @@ impl PeerCenterRpc for PeerCenterServer { tracing::debug!("receive report_peers"); - let data = get_global_data(self.my_node_id); + let data = &self.data; data.peer_report_time .insert(my_peer_id, std::time::Instant::now()); @@ -134,7 +151,7 @@ impl PeerCenterRpc for PeerCenterServer { } data.digest - .store(PeerCenterServer::calc_global_digest(self.my_node_id)); + .store(PeerCenterServer::calc_global_digest_data(data)); Ok(ReportPeersResponse::default()) } @@ -147,7 +164,7 @@ impl PeerCenterRpc for PeerCenterServer { ) -> Result { let digest = req.digest; - let data = get_global_data(self.my_node_id); + let data = &self.data; if digest == data.digest.load() && digest != 0 { return Ok(GetGlobalPeerMapResponse::default()); } @@ -171,3 +188,24 @@ impl PeerCenterRpc for PeerCenterServer { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn global_data_removed_when_last_server_drops() { + let peer_id = u32::MAX - 17; + GLOBAL_DATA.remove(&peer_id); + + let server = PeerCenterServer::new(peer_id); + assert!(GLOBAL_DATA.contains_key(&peer_id)); + + let server_clone = server.clone(); + drop(server); + assert!(GLOBAL_DATA.contains_key(&peer_id)); + + drop(server_clone); + assert!(!GLOBAL_DATA.contains_key(&peer_id)); + } +} diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 5f8943e0..31aef0d6 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -87,7 +87,7 @@ struct ForeignNetworkEntry { packet_recv: Mutex>, - bps_limiter: Arc, + bps_limiter: Option>, peer_center: Arc, @@ -191,14 +191,16 @@ impl ForeignNetworkEntry { ); let relay_bps_limit = global_ctx.config.get_flags().foreign_relay_bps_limit; - let limiter_config = LimiterConfig { - burst_rate: None, - bps: Some(relay_bps_limit), - fill_duration_ms: None, - }; - let bps_limiter = global_ctx - .token_bucket_manager() - .get_or_create(&network.network_name, limiter_config.into()); + let bps_limiter = (relay_bps_limit != u64::MAX).then(|| { + let limiter_config = LimiterConfig { + burst_rate: None, + bps: Some(relay_bps_limit), + fill_duration_ms: None, + }; + global_ctx + .token_bucket_manager() + .get_or_create(&network.network_name, limiter_config.into()) + }); let peer_center = Arc::new(PeerCenterInstance::new(Arc::new( PeerMapWithPeerRpcManager { @@ -536,7 +538,9 @@ impl ForeignNetworkEntry { ); continue; } - if !bps_limiter.try_consume(len.into()) { + if let Some(bps_limiter) = bps_limiter.as_ref() + && !bps_limiter.try_consume(len.into()) + { continue; } } @@ -713,6 +717,7 @@ impl ForeignNetworkManagerData { fn remove_network(&self, network_name: &String) { let _l = self.lock.lock().unwrap(); if let Some(old) = self.network_peer_maps.remove(network_name) { + old.1.traffic_metrics.clear_peer_cache(); let to_remove_peers = old.1.peer_map.list_peers(); for p in to_remove_peers { self.peer_network_map.remove_if(&p, |_, v| { @@ -722,6 +727,9 @@ impl ForeignNetworkManagerData { } } self.network_peer_last_update.remove(network_name); + shrink_dashmap(&self.peer_network_map, None); + shrink_dashmap(&self.network_peer_maps, None); + shrink_dashmap(&self.network_peer_last_update, None); } #[allow(clippy::too_many_arguments)] @@ -994,12 +1002,14 @@ impl ForeignNetworkManager { async fn start_event_handler(&self, entry: &ForeignNetworkEntry) { let data = self.data.clone(); let network_name = entry.network.network_name.clone(); + let traffic_metrics = entry.traffic_metrics.clone(); let mut s = entry.global_ctx.subscribe(); self.tasks.lock().unwrap().spawn(async move { while let Ok(e) = s.recv().await { match &e { GlobalCtxEvent::PeerRemoved(peer_id) => { tracing::info!(?e, "remove peer from foreign network manager"); + traffic_metrics.remove_peer(*peer_id); data.remove_peer(*peer_id, &network_name); data.network_peer_last_update .insert(network_name.clone(), SystemTime::now()); @@ -1018,6 +1028,7 @@ impl ForeignNetworkManager { } // if lagged or recv done just remove the network tracing::error!("global event handler at foreign network manager exit"); + traffic_metrics.clear_peer_cache(); data.remove_network(&network_name); }); } @@ -1604,6 +1615,58 @@ pub mod tests { .await; } + #[tokio::test] + async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + + connect_peer_manager(pma_net1.clone(), pm_center.clone()).await; + wait_for_condition( + || { + let pm_center = pm_center.clone(); + async move { + pm_center + .get_foreign_network_manager() + .get_network_peer_id("net1") + .is_some() + } + }, + Duration::from_secs(5), + ) + .await; + + let entry = pm_center + .get_foreign_network_manager() + .data + .get_network_entry("net1") + .unwrap(); + + entry + .traffic_metrics + .record_rx(pma_net1.my_peer_id(), PacketType::Data as u8, 128) + .await; + + assert!( + entry + .traffic_metrics + .contains_peer_cache(pma_net1.my_peer_id()) + ); + + entry + .global_ctx + .issue_event(GlobalCtxEvent::PeerRemoved(pma_net1.my_peer_id())); + + wait_for_condition( + || { + let entry = entry.clone(); + let peer_id = pma_net1.my_peer_id(); + async move { !entry.traffic_metrics.contains_peer_cache(peer_id) } + }, + Duration::from_secs(5), + ) + .await; + } + #[tokio::test] async fn foreign_network_encapsulated_forwarding_records_tx_metrics() { set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1); diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index cac6960f..5e2b6d55 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -1352,7 +1352,9 @@ impl PeerConn { let is_foreign_network = conn_info_for_instrument.network_name != self.global_ctx.get_network_identity().network_name; - let recv_limiter = if is_foreign_network { + let recv_limiter = if is_foreign_network + && self.global_ctx.get_flags().foreign_relay_bps_limit != u64::MAX + { let relay_network_bps_limit = self.global_ctx.get_flags().foreign_relay_bps_limit; let limiter_config = LimiterConfig { burst_rate: None, diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 1c273a6b..c5d03fdd 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -687,6 +687,11 @@ impl PeerManager { Ok(()) } + fn release_reserved_peer_id(&self, network_name: &str) { + self.reserved_my_peer_id_map.remove(network_name); + shrink_dashmap(&self.reserved_my_peer_id_map, None); + } + #[tracing::instrument(ret)] pub async fn add_tunnel_as_server( &self, @@ -702,7 +707,8 @@ impl PeerManager { tunnel, self.peer_session_store.clone(), ); - conn.do_handshake_as_server_ext(|peer, network_name:&str| { + let mut reserved_peer_id_network_name = None; + let handshake_ret = conn.do_handshake_as_server_ext(|peer, network_name:&str| { if network_name == self.global_ctx.get_network_identity().network_name { @@ -713,6 +719,7 @@ impl PeerManager { .foreign_network_manager .get_network_peer_id(network_name); if peer_id.is_none() { + reserved_peer_id_network_name = Some(network_name.to_string()); peer_id = Some(*self.reserved_my_peer_id_map.entry(network_name.to_string()).or_insert_with(|| { rand::random::() }).value()); @@ -728,7 +735,14 @@ impl PeerManager { Ok(()) }) - .await?; + .await; + + if let Err(err) = handshake_ret { + if let Some(network_name) = reserved_peer_id_network_name { + self.release_reserved_peer_id(&network_name); + } + return Err(err); + } let peer_identity = conn.get_network_identity(); let peer_network_name = peer_identity.network_name.clone(); @@ -747,6 +761,7 @@ impl PeerManager { if !is_local_network && self.global_ctx.get_flags().private_mode && !foreign_network_allowed { + self.release_reserved_peer_id(&peer_network_name); return Err(Error::SecretKeyError( "private mode is turned on, foreign network secret mismatch".to_string(), )); @@ -754,14 +769,18 @@ impl PeerManager { conn.set_is_hole_punched(!is_directly_connected); - if is_local_network { - self.add_new_peer_conn(conn).await?; + let add_peer_ret = if is_local_network { + self.add_new_peer_conn(conn).await } else { - self.foreign_network_manager.add_peer_conn(conn).await?; + self.foreign_network_manager.add_peer_conn(conn).await + }; + + if let Err(err) = add_peer_ret { + self.release_reserved_peer_id(&peer_network_name); + return Err(err); } - self.reserved_my_peer_id_map.remove(&peer_network_name); - shrink_dashmap(&self.reserved_my_peer_id_map, None); + self.release_reserved_peer_id(&peer_network_name); tracing::info!("add tunnel as server done"); Ok(()) diff --git a/easytier/src/peers/peer_session.rs b/easytier/src/peers/peer_session.rs index ee6a947a..34599ead 100644 --- a/easytier/src/peers/peer_session.rs +++ b/easytier/src/peers/peer_session.rs @@ -7,7 +7,10 @@ use anyhow::anyhow; use dashmap::DashMap; use super::secure_datagram::{SecureDatagramDirection, SecureDatagramSession}; -use crate::{common::PeerId, tunnel::packet_def::ZCPacket}; +use crate::{ + common::{PeerId, shrink_dashmap}, + tunnel::packet_def::ZCPacket, +}; pub struct UpsertResponderSessionReturn { pub session: Arc, @@ -78,6 +81,7 @@ impl PeerSessionStore { pub fn evict_unused_sessions(&self) { self.sessions .retain(|_key, session| Arc::strong_count(session) > 1); + shrink_dashmap(&self.sessions, None); } #[tracing::instrument(skip(self))] diff --git a/easytier/src/peers/relay_peer_map.rs b/easytier/src/peers/relay_peer_map.rs index 441010aa..db7dd855 100644 --- a/easytier/src/peers/relay_peer_map.rs +++ b/easytier/src/peers/relay_peer_map.rs @@ -9,7 +9,7 @@ use tokio::time::{Duration, timeout}; use crate::peers::foreign_network_client::ForeignNetworkClient; use crate::{ common::error::Error, - common::{PeerId, global_ctx::ArcGlobalCtx}, + common::{PeerId, global_ctx::ArcGlobalCtx, shrink_dashmap}, peers::peer_map::PeerMap, peers::peer_session::{PeerSession, PeerSessionAction, PeerSessionStore, SessionKey}, peers::route_trait::NextHopPolicy, @@ -652,6 +652,10 @@ impl RelayPeerMap { self.handshake_locks.remove(&peer_id); self.pending_packets.remove(&peer_id); } + shrink_dashmap(&self.states, None); + shrink_dashmap(&self.pending_handshakes, None); + shrink_dashmap(&self.handshake_locks, None); + shrink_dashmap(&self.pending_packets, None); } pub fn has_state(&self, peer_id: PeerId) -> bool { @@ -679,6 +683,10 @@ impl RelayPeerMap { self.pending_handshakes.remove(&peer_id); self.handshake_locks.remove(&peer_id); self.pending_packets.remove(&peer_id); + shrink_dashmap(&self.states, None); + shrink_dashmap(&self.pending_handshakes, None); + shrink_dashmap(&self.handshake_locks, None); + shrink_dashmap(&self.pending_packets, None); tracing::debug!(?peer_id, "RelayPeerMap removed peer relay state"); } diff --git a/easytier/src/peers/traffic_metrics.rs b/easytier/src/peers/traffic_metrics.rs index c2eda12b..398aeaa5 100644 --- a/easytier/src/peers/traffic_metrics.rs +++ b/easytier/src/peers/traffic_metrics.rs @@ -201,6 +201,11 @@ impl LogicalTrafficMetrics { self.per_peer.len() } + #[cfg(test)] + fn contains_peer_cache(&self, peer_id: PeerId) -> bool { + self.per_peer.contains_key(&peer_id) + } + fn build_peer_counters(&self, instance_id: &str) -> TrafficCounters { let instance_label = match self.label_kind { InstanceLabelKind::To => LabelType::ToInstanceId(instance_id.to_string()), @@ -333,6 +338,14 @@ impl TrafficMetricRecorder { self.rx_metrics.control.clear_peer_cache(); } + #[cfg(test)] + pub(crate) fn contains_peer_cache(&self, peer_id: PeerId) -> bool { + self.tx_metrics.data.contains_peer_cache(peer_id) + || self.tx_metrics.control.contains_peer_cache(peer_id) + || self.rx_metrics.data.contains_peer_cache(peer_id) + || self.rx_metrics.control.contains_peer_cache(peer_id) + } + fn resolve_instance_id(&self, peer_id: PeerId) -> BoxFuture<'static, Option> { (self.resolve_instance_id)(peer_id) }