fix foreign network entry leak (#2211)

This commit is contained in:
KKRainbow
2026-05-05 11:01:44 +08:00
committed by GitHub
parent 362aa7a9cd
commit 1178b312fa
+125 -15
View File
@@ -6,11 +6,15 @@ in the future, with the help wo peer center we can forward packets of peers that
connected to any node in the local network.
*/
use std::{
sync::{Arc, Weak},
sync::{
Arc, Weak,
atomic::{AtomicBool, Ordering},
},
time::SystemTime,
};
use dashmap::{DashMap, DashSet};
use guarden::defer;
use tokio::{
sync::{
Mutex,
@@ -93,6 +97,7 @@ struct ForeignNetworkEntry {
stats_mgr: Arc<StatsManager>,
traffic_metrics: Arc<TrafficMetricRecorder>,
event_handler_started: AtomicBool,
tasks: Mutex<JoinSet<()>>,
@@ -160,10 +165,11 @@ impl ForeignNetworkEntry {
InstanceLabelKind::From,
)),
{
let peer_map = peer_map.clone();
let peer_map = Arc::downgrade(&peer_map);
move |peer_id| {
let peer_map = peer_map.clone();
async move {
let peer_map = peer_map.upgrade()?;
peer_map
.get_route_peer_info(peer_id)
.await
@@ -230,6 +236,7 @@ impl ForeignNetworkEntry {
stats_mgr,
traffic_metrics,
event_handler_started: AtomicBool::new(false),
tasks: Mutex::new(JoinSet::new()),
@@ -674,6 +681,8 @@ struct ForeignNetworkManagerData {
network_peer_last_update: DashMap<String, SystemTime>,
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
lock: std::sync::Mutex<()>,
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool,
}
impl ForeignNetworkManagerData {
@@ -732,6 +741,36 @@ impl ForeignNetworkManagerData {
shrink_dashmap(&self.network_peer_last_update, None);
}
fn remove_network_if_current(
&self,
network_name: &String,
expected_entry: &Weak<ForeignNetworkEntry>,
) {
let _l = self.lock.lock().unwrap();
let Some(expected_entry) = expected_entry.upgrade() else {
return;
};
let old = self
.network_peer_maps
.remove_if(network_name, |_, entry| Arc::ptr_eq(entry, &expected_entry));
let Some((_, old)) = old else {
return;
};
old.traffic_metrics.clear_peer_cache();
let to_remove_peers = old.peer_map.list_peers();
for p in to_remove_peers {
self.peer_network_map.remove_if(&p, |_, v| {
v.remove(network_name);
v.is_empty()
});
}
self.network_peer_last_update.remove(network_name);
shrink_dashmap(&self.peer_network_map, None);
shrink_dashmap(&self.network_peer_maps, None);
shrink_dashmap(&self.network_peer_last_update, None);
}
#[allow(clippy::too_many_arguments)]
async fn get_or_insert_entry(
&self,
@@ -874,6 +913,8 @@ impl ForeignNetworkManager {
network_peer_last_update: DashMap::new(),
accessor: Arc::new(accessor),
lock: std::sync::Mutex::new(()),
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool::new(false),
});
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
@@ -891,6 +932,13 @@ impl ForeignNetworkManager {
}
}
#[cfg(test)]
fn fail_next_add_peer_conn_after_entry_insert(&self) {
self.data
.fail_next_add_peer_conn_after_entry_insert
.store(true, Ordering::Release);
}
pub fn get_network_peer_id(&self, network_name: &str) -> Option<PeerId> {
self.data
.network_peer_maps
@@ -939,6 +987,35 @@ impl ForeignNetworkManager {
)
.await;
defer!(rollback_new_entry => sync [
data = self.data.clone(),
network_name = entry.network.network_name.clone(),
peer_id = peer_conn.get_peer_id(),
should_rollback = new_added
] {
if should_rollback {
tracing::warn!(
%network_name,
"rollback newly added foreign network entry after add_peer_conn returned error"
);
data.remove_peer(peer_id, &network_name);
}
});
#[cfg(test)]
if self
.data
.fail_next_add_peer_conn_after_entry_insert
.swap(false, Ordering::AcqRel)
{
return Err(anyhow::anyhow!(
"injected add_peer_conn failure after foreign network entry insert"
)
.into());
}
self.ensure_event_handler_started(&entry);
let same_identity = entry.network == peer_network;
let peer_identity_type = peer_conn.get_peer_identity_type();
let credential_peer_trusted = peer_digest_empty
@@ -952,10 +1029,6 @@ impl ForeignNetworkManager {
|| credential_identity_mismatch
|| entry.my_peer_id != peer_conn.get_my_peer_id()
{
if new_added {
self.data
.remove_peer(peer_conn.get_peer_id(), &entry.network.network_name.clone());
}
let err = if entry.my_peer_id != peer_conn.get_my_peer_id() {
anyhow::anyhow!(
"my peer id not match. exp: {:?} real: {:?}, need retry connect",
@@ -980,9 +1053,7 @@ impl ForeignNetworkManager {
return Err(err.into());
}
if new_added {
self.start_event_handler(&entry).await;
} else if let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) {
if !new_added && let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) {
let direct_conns_len = peer.get_directly_connections().len();
let max_count = use_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK);
if direct_conns_len >= max_count as usize {
@@ -996,23 +1067,31 @@ impl ForeignNetworkManager {
}
entry.peer_map.add_new_peer_conn(peer_conn).await?;
let _ = rollback_new_entry.defuse();
Ok(())
}
async fn start_event_handler(&self, entry: &ForeignNetworkEntry) {
fn ensure_event_handler_started(&self, entry: &Arc<ForeignNetworkEntry>) {
if entry.event_handler_started.swap(true, Ordering::AcqRel) {
return;
}
let data = self.data.clone();
let network_name = entry.network.network_name.clone();
let traffic_metrics = entry.traffic_metrics.clone();
let entry_for_cleanup = Arc::downgrade(entry);
let traffic_metrics = Arc::downgrade(&entry.traffic_metrics);
let mut s = entry.global_ctx.subscribe();
self.tasks.lock().unwrap().spawn(async move {
while let Ok(e) = s.recv().await {
match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => {
tracing::info!(?e, "remove peer from foreign network manager");
traffic_metrics.remove_peer(*peer_id);
data.remove_peer(*peer_id, &network_name);
if let Some(traffic_metrics) = traffic_metrics.upgrade() {
traffic_metrics.remove_peer(*peer_id);
}
data.network_peer_last_update
.insert(network_name.clone(), SystemTime::now());
data.remove_peer(*peer_id, &network_name);
}
GlobalCtxEvent::PeerConnRemoved(..) => {
tracing::info!(?e, "clear no conn peer from foreign network manager");
@@ -1028,8 +1107,10 @@ impl ForeignNetworkManager {
}
// if lagged or recv done just remove the network
tracing::error!("global event handler at foreign network manager exit");
traffic_metrics.clear_peer_cache();
data.remove_network(&network_name);
if let Some(traffic_metrics) = traffic_metrics.upgrade() {
traffic_metrics.clear_peer_cache();
}
data.remove_network_if_current(&network_name, &entry_for_cleanup);
});
}
@@ -1615,6 +1696,35 @@ pub mod tests {
.await;
}
#[tokio::test]
async fn failed_new_foreign_peer_conn_rolls_back_entry_maps() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let foreign_mgr = pm_center.get_foreign_network_manager();
foreign_mgr.fail_next_add_peer_conn_after_entry_insert();
let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair();
let (client_ret, server_ret) = tokio::time::timeout(Duration::from_secs(5), async {
tokio::join!(
pma_net1.add_client_tunnel(a_ring, false),
pm_center.add_tunnel_as_server(b_ring, true)
)
})
.await
.unwrap();
assert!(client_ret.is_ok());
assert!(server_ret.is_err());
assert!(foreign_mgr.data.get_network_entry("net1").is_none());
assert!(
foreign_mgr
.data
.get_peer_network(pma_net1.my_peer_id())
.is_none()
);
}
#[tokio::test]
async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;