From 1178b312fa93b6418cdb7e8d92a8fa25983108e6 Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Tue, 5 May 2026 11:01:44 +0800 Subject: [PATCH] fix foreign network entry leak (#2211) --- easytier/src/peers/foreign_network_manager.rs | 140 ++++++++++++++++-- 1 file changed, 125 insertions(+), 15 deletions(-) diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 31aef0d6..517137a3 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -6,11 +6,15 @@ in the future, with the help wo peer center we can forward packets of peers that connected to any node in the local network. */ use std::{ - sync::{Arc, Weak}, + sync::{ + Arc, Weak, + atomic::{AtomicBool, Ordering}, + }, time::SystemTime, }; use dashmap::{DashMap, DashSet}; +use guarden::defer; use tokio::{ sync::{ Mutex, @@ -93,6 +97,7 @@ struct ForeignNetworkEntry { stats_mgr: Arc, traffic_metrics: Arc, + event_handler_started: AtomicBool, tasks: Mutex>, @@ -160,10 +165,11 @@ impl ForeignNetworkEntry { InstanceLabelKind::From, )), { - let peer_map = peer_map.clone(); + let peer_map = Arc::downgrade(&peer_map); move |peer_id| { let peer_map = peer_map.clone(); async move { + let peer_map = peer_map.upgrade()?; peer_map .get_route_peer_info(peer_id) .await @@ -230,6 +236,7 @@ impl ForeignNetworkEntry { stats_mgr, traffic_metrics, + event_handler_started: AtomicBool::new(false), tasks: Mutex::new(JoinSet::new()), @@ -674,6 +681,8 @@ struct ForeignNetworkManagerData { network_peer_last_update: DashMap, accessor: Arc>, lock: std::sync::Mutex<()>, + #[cfg(test)] + fail_next_add_peer_conn_after_entry_insert: AtomicBool, } impl ForeignNetworkManagerData { @@ -732,6 +741,36 @@ impl ForeignNetworkManagerData { shrink_dashmap(&self.network_peer_last_update, None); } + fn remove_network_if_current( + &self, + network_name: &String, + expected_entry: &Weak, + ) { + let _l = self.lock.lock().unwrap(); + let Some(expected_entry) = expected_entry.upgrade() else { + return; + }; + let old = self + .network_peer_maps + .remove_if(network_name, |_, entry| Arc::ptr_eq(entry, &expected_entry)); + let Some((_, old)) = old else { + return; + }; + + old.traffic_metrics.clear_peer_cache(); + let to_remove_peers = old.peer_map.list_peers(); + for p in to_remove_peers { + self.peer_network_map.remove_if(&p, |_, v| { + v.remove(network_name); + v.is_empty() + }); + } + self.network_peer_last_update.remove(network_name); + shrink_dashmap(&self.peer_network_map, None); + shrink_dashmap(&self.network_peer_maps, None); + shrink_dashmap(&self.network_peer_last_update, None); + } + #[allow(clippy::too_many_arguments)] async fn get_or_insert_entry( &self, @@ -874,6 +913,8 @@ impl ForeignNetworkManager { network_peer_last_update: DashMap::new(), accessor: Arc::new(accessor), lock: std::sync::Mutex::new(()), + #[cfg(test)] + fail_next_add_peer_conn_after_entry_insert: AtomicBool::new(false), }); let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); @@ -891,6 +932,13 @@ impl ForeignNetworkManager { } } + #[cfg(test)] + fn fail_next_add_peer_conn_after_entry_insert(&self) { + self.data + .fail_next_add_peer_conn_after_entry_insert + .store(true, Ordering::Release); + } + pub fn get_network_peer_id(&self, network_name: &str) -> Option { self.data .network_peer_maps @@ -939,6 +987,35 @@ impl ForeignNetworkManager { ) .await; + defer!(rollback_new_entry => sync [ + data = self.data.clone(), + network_name = entry.network.network_name.clone(), + peer_id = peer_conn.get_peer_id(), + should_rollback = new_added + ] { + if should_rollback { + tracing::warn!( + %network_name, + "rollback newly added foreign network entry after add_peer_conn returned error" + ); + data.remove_peer(peer_id, &network_name); + } + }); + + #[cfg(test)] + if self + .data + .fail_next_add_peer_conn_after_entry_insert + .swap(false, Ordering::AcqRel) + { + return Err(anyhow::anyhow!( + "injected add_peer_conn failure after foreign network entry insert" + ) + .into()); + } + + self.ensure_event_handler_started(&entry); + let same_identity = entry.network == peer_network; let peer_identity_type = peer_conn.get_peer_identity_type(); let credential_peer_trusted = peer_digest_empty @@ -952,10 +1029,6 @@ impl ForeignNetworkManager { || credential_identity_mismatch || entry.my_peer_id != peer_conn.get_my_peer_id() { - if new_added { - self.data - .remove_peer(peer_conn.get_peer_id(), &entry.network.network_name.clone()); - } let err = if entry.my_peer_id != peer_conn.get_my_peer_id() { anyhow::anyhow!( "my peer id not match. exp: {:?} real: {:?}, need retry connect", @@ -980,9 +1053,7 @@ impl ForeignNetworkManager { return Err(err.into()); } - if new_added { - self.start_event_handler(&entry).await; - } else if let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) { + if !new_added && let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) { let direct_conns_len = peer.get_directly_connections().len(); let max_count = use_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK); if direct_conns_len >= max_count as usize { @@ -996,23 +1067,31 @@ impl ForeignNetworkManager { } entry.peer_map.add_new_peer_conn(peer_conn).await?; + let _ = rollback_new_entry.defuse(); Ok(()) } - async fn start_event_handler(&self, entry: &ForeignNetworkEntry) { + fn ensure_event_handler_started(&self, entry: &Arc) { + if entry.event_handler_started.swap(true, Ordering::AcqRel) { + return; + } + let data = self.data.clone(); let network_name = entry.network.network_name.clone(); - let traffic_metrics = entry.traffic_metrics.clone(); + let entry_for_cleanup = Arc::downgrade(entry); + let traffic_metrics = Arc::downgrade(&entry.traffic_metrics); let mut s = entry.global_ctx.subscribe(); self.tasks.lock().unwrap().spawn(async move { while let Ok(e) = s.recv().await { match &e { GlobalCtxEvent::PeerRemoved(peer_id) => { tracing::info!(?e, "remove peer from foreign network manager"); - traffic_metrics.remove_peer(*peer_id); - data.remove_peer(*peer_id, &network_name); + if let Some(traffic_metrics) = traffic_metrics.upgrade() { + traffic_metrics.remove_peer(*peer_id); + } data.network_peer_last_update .insert(network_name.clone(), SystemTime::now()); + data.remove_peer(*peer_id, &network_name); } GlobalCtxEvent::PeerConnRemoved(..) => { tracing::info!(?e, "clear no conn peer from foreign network manager"); @@ -1028,8 +1107,10 @@ impl ForeignNetworkManager { } // if lagged or recv done just remove the network tracing::error!("global event handler at foreign network manager exit"); - traffic_metrics.clear_peer_cache(); - data.remove_network(&network_name); + if let Some(traffic_metrics) = traffic_metrics.upgrade() { + traffic_metrics.clear_peer_cache(); + } + data.remove_network_if_current(&network_name, &entry_for_cleanup); }); } @@ -1615,6 +1696,35 @@ pub mod tests { .await; } + #[tokio::test] + async fn failed_new_foreign_peer_conn_rolls_back_entry_maps() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let foreign_mgr = pm_center.get_foreign_network_manager(); + + foreign_mgr.fail_next_add_peer_conn_after_entry_insert(); + + let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair(); + let (client_ret, server_ret) = tokio::time::timeout(Duration::from_secs(5), async { + tokio::join!( + pma_net1.add_client_tunnel(a_ring, false), + pm_center.add_tunnel_as_server(b_ring, true) + ) + }) + .await + .unwrap(); + + assert!(client_ret.is_ok()); + assert!(server_ret.is_err()); + assert!(foreign_mgr.data.get_network_entry("net1").is_none()); + assert!( + foreign_mgr + .data + .get_peer_network(pma_net1.my_peer_id()) + .is_none() + ); + } + #[tokio::test] async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() { let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;