diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index 461d74b7..81bf32b3 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -65,7 +65,7 @@ impl PeerCenterBase { return Err(Error::Shutdown); }; rpc_mgr.rpc_server().registry().register( - PeerCenterRpcServer::new(PeerCenterServer::new(self.peer_mgr.my_peer_id())), + PeerCenterRpcServer::new(PeerCenterServer::new()), &self.peer_mgr.get_global_ctx().get_network_name(), ); Ok(()) @@ -486,7 +486,6 @@ impl PeerCenterPeerManagerTrait for PeerMapWithPeerRpcManager { #[cfg(test)] mod tests { use crate::{ - peer_center::server::get_global_data, peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, tunnel::common::tests::wait_for_condition, }; @@ -515,25 +514,6 @@ mod tests { .await .unwrap(); - let center_peer = PeerCenterBase::select_center_peer(&peer_mgr_a) - .await - .unwrap(); - let center_data = get_global_data(center_peer); - - // wait center_data has 3 records for 10 seconds - wait_for_condition( - || async { - if center_data.global_peer_map.len() == 4 { - println!("center data {:#?}", center_data.global_peer_map); - true - } else { - false - } - }, - Duration::from_secs(20), - ) - .await; - let mut digest = None; for pc in peer_centers.iter() { let rpc_service = pc.get_rpc_service(); @@ -578,8 +558,5 @@ mod tests { route_cost.end_update(); assert!(!route_cost.need_update()); } - - let global_digest = get_global_data(center_peer).digest.load(); - assert_eq!(digest.as_ref().unwrap(), &global_digest); } } diff --git a/easytier/src/peer_center/server.rs b/easytier/src/peer_center/server.rs index 5a166406..10129f13 100644 --- a/easytier/src/peer_center/server.rs +++ b/easytier/src/peer_center/server.rs @@ -6,7 +6,6 @@ use std::{ use crossbeam::atomic::AtomicCell; use dashmap::DashMap; -use once_cell::sync::Lazy; use tokio::task::JoinSet; use crate::{ @@ -36,35 +35,21 @@ pub(crate) struct PeerCenterInfoEntry { } #[derive(Debug, Default)] -pub(crate) struct PeerCenterServerGlobalData { - pub(crate) global_peer_map: DashMap, - pub(crate) peer_report_time: DashMap, - pub(crate) digest: AtomicCell, -} - -// a global unique instance for PeerCenterServer -pub(crate) static GLOBAL_DATA: Lazy>> = - Lazy::new(DashMap::new); - -pub(crate) fn get_global_data(node_id: PeerId) -> Arc { - GLOBAL_DATA - .entry(node_id) - .or_insert_with(|| Arc::new(PeerCenterServerGlobalData::default())) - .value() - .clone() +struct PeerCenterServerData { + global_peer_map: DashMap, + peer_report_time: DashMap, + digest: AtomicCell, } #[derive(Clone, Debug)] pub struct PeerCenterServer { - // every peer has its own server, so use per-struct dash map is ok. - my_node_id: PeerId, - data: Arc, + data: Arc, tasks: Arc>, } impl PeerCenterServer { - pub fn new(my_node_id: PeerId) -> Self { - let data = get_global_data(my_node_id); + pub fn new() -> Self { + let data = Arc::new(PeerCenterServerData::default()); let weak_data = Arc::downgrade(&data); let mut tasks = JoinSet::new(); tasks.spawn(async move { @@ -78,13 +63,12 @@ impl PeerCenterServer { }); PeerCenterServer { - my_node_id, data, tasks: Arc::new(tasks), } } - async fn clean_outdated_peer_data(data: &PeerCenterServerGlobalData) { + async fn clean_outdated_peer_data(data: &PeerCenterServerData) { data.peer_report_time.retain(|_, v| { std::time::Instant::now().duration_since(*v) < std::time::Duration::from_secs(180) }); @@ -94,7 +78,7 @@ impl PeerCenterServer { }); } - fn calc_global_digest_data(data: &PeerCenterServerGlobalData) -> Digest { + fn calc_global_digest_data(data: &PeerCenterServerData) -> Digest { let mut hasher = std::collections::hash_map::DefaultHasher::new(); data.global_peer_map .iter() @@ -107,18 +91,6 @@ impl PeerCenterServer { } } -impl Drop for PeerCenterServer { - fn drop(&mut self) { - if Arc::strong_count(&self.tasks) != 1 { - return; - } - - GLOBAL_DATA.remove_if(&self.my_node_id, |_, data| { - Arc::ptr_eq(data, &self.data) && Arc::strong_count(data) <= 2 - }); - } -} - #[async_trait::async_trait] impl PeerCenterRpc for PeerCenterServer { type Controller = BaseController; @@ -194,18 +166,74 @@ mod tests { use super::*; #[tokio::test] - async fn global_data_removed_when_last_server_drops() { - let peer_id = u32::MAX - 17; - GLOBAL_DATA.remove(&peer_id); - - let server = PeerCenterServer::new(peer_id); - assert!(GLOBAL_DATA.contains_key(&peer_id)); - + async fn server_clones_share_instance_data() { + let server = PeerCenterServer::new(); let server_clone = server.clone(); - drop(server); - assert!(GLOBAL_DATA.contains_key(&peer_id)); - drop(server_clone); - assert!(!GLOBAL_DATA.contains_key(&peer_id)); + let mut peers = PeerInfoForGlobalMap::default(); + peers + .direct_peers + .insert(100, DirectConnectedPeerInfo { latency_ms: 3 }); + + server + .report_peers( + BaseController::default(), + ReportPeersRequest { + my_peer_id: 99, + peer_infos: Some(peers), + }, + ) + .await + .unwrap(); + + let resp = server_clone + .get_global_peer_map( + BaseController::default(), + GetGlobalPeerMapRequest { digest: 0 }, + ) + .await + .unwrap(); + assert_eq!(1, resp.global_peer_map.len()); + assert!(resp.global_peer_map[&99].direct_peers.contains_key(&100)); + } + + #[tokio::test] + async fn independent_server_instances_do_not_share_data() { + let server_a = PeerCenterServer::new(); + let server_b = PeerCenterServer::new(); + + let mut peers = PeerInfoForGlobalMap::default(); + peers + .direct_peers + .insert(101, DirectConnectedPeerInfo { latency_ms: 5 }); + + server_a + .report_peers( + BaseController::default(), + ReportPeersRequest { + my_peer_id: 100, + peer_infos: Some(peers), + }, + ) + .await + .unwrap(); + + let resp_a = server_a + .get_global_peer_map( + BaseController::default(), + GetGlobalPeerMapRequest { digest: 0 }, + ) + .await + .unwrap(); + assert_eq!(1, resp_a.global_peer_map.len()); + + let resp_b = server_b + .get_global_peer_map( + BaseController::default(), + GetGlobalPeerMapRequest { digest: 0 }, + ) + .await + .unwrap(); + assert!(resp_b.global_peer_map.is_empty()); } }