mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 02:09:06 +00:00
handle close peer conn correctly (#1082)
This commit is contained in:
@@ -211,10 +211,7 @@ impl DirectConnectorManagerData {
|
|||||||
dst_peer_id,
|
dst_peer_id,
|
||||||
peer_id
|
peer_id
|
||||||
);
|
);
|
||||||
self.peer_manager
|
self.peer_manager.close_peer_conn(peer_id, &conn_id).await?;
|
||||||
.get_peer_map()
|
|
||||||
.close_peer_conn(peer_id, &conn_id)
|
|
||||||
.await?;
|
|
||||||
return Err(Error::InvalidUrl(addr));
|
return Err(Error::InvalidUrl(addr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use std::{
|
|||||||
time::SystemTime,
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::{DashMap, DashSet};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{
|
sync::{
|
||||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||||
@@ -367,14 +367,14 @@ impl Drop for ForeignNetworkEntry {
|
|||||||
|
|
||||||
struct ForeignNetworkManagerData {
|
struct ForeignNetworkManagerData {
|
||||||
network_peer_maps: DashMap<String, Arc<ForeignNetworkEntry>>,
|
network_peer_maps: DashMap<String, Arc<ForeignNetworkEntry>>,
|
||||||
peer_network_map: DashMap<PeerId, String>,
|
peer_network_map: DashMap<PeerId, DashSet<String>>,
|
||||||
network_peer_last_update: DashMap<String, SystemTime>,
|
network_peer_last_update: DashMap<String, SystemTime>,
|
||||||
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
|
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
|
||||||
lock: std::sync::Mutex<()>,
|
lock: std::sync::Mutex<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ForeignNetworkManagerData {
|
impl ForeignNetworkManagerData {
|
||||||
fn get_peer_network(&self, peer_id: PeerId) -> Option<String> {
|
fn get_peer_network(&self, peer_id: PeerId) -> Option<DashSet<String>> {
|
||||||
self.peer_network_map.get(&peer_id).map(|v| v.clone())
|
self.peer_network_map.get(&peer_id).map(|v| v.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -384,7 +384,10 @@ impl ForeignNetworkManagerData {
|
|||||||
|
|
||||||
fn remove_peer(&self, peer_id: PeerId, network_name: &String) {
|
fn remove_peer(&self, peer_id: PeerId, network_name: &String) {
|
||||||
let _l = self.lock.lock().unwrap();
|
let _l = self.lock.lock().unwrap();
|
||||||
self.peer_network_map.remove(&peer_id);
|
self.peer_network_map.remove_if(&peer_id, |_, v| {
|
||||||
|
let _ = v.remove(network_name);
|
||||||
|
v.is_empty()
|
||||||
|
});
|
||||||
if let Some(_) = self
|
if let Some(_) = self
|
||||||
.network_peer_maps
|
.network_peer_maps
|
||||||
.remove_if(network_name, |_, v| v.peer_map.is_empty())
|
.remove_if(network_name, |_, v| v.peer_map.is_empty())
|
||||||
@@ -406,7 +409,10 @@ impl ForeignNetworkManagerData {
|
|||||||
|
|
||||||
fn remove_network(&self, network_name: &String) {
|
fn remove_network(&self, network_name: &String) {
|
||||||
let _l = self.lock.lock().unwrap();
|
let _l = self.lock.lock().unwrap();
|
||||||
self.peer_network_map.retain(|_, v| v != network_name);
|
self.peer_network_map.iter().for_each(|v| {
|
||||||
|
v.value().remove(network_name);
|
||||||
|
});
|
||||||
|
self.peer_network_map.retain(|_, v| !v.is_empty());
|
||||||
self.network_peer_maps.remove(network_name);
|
self.network_peer_maps.remove(network_name);
|
||||||
self.network_peer_last_update.remove(network_name);
|
self.network_peer_last_update.remove(network_name);
|
||||||
}
|
}
|
||||||
@@ -439,7 +445,9 @@ impl ForeignNetworkManagerData {
|
|||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
self.peer_network_map
|
self.peer_network_map
|
||||||
.insert(dst_peer_id, network_identity.network_name.clone());
|
.entry(dst_peer_id)
|
||||||
|
.or_insert_with(|| DashSet::new())
|
||||||
|
.insert(network_identity.network_name.clone());
|
||||||
|
|
||||||
self.network_peer_last_update
|
self.network_peer_last_update
|
||||||
.insert(network_identity.network_name.clone(), SystemTime::now());
|
.insert(network_identity.network_name.clone(), SystemTime::now());
|
||||||
@@ -665,6 +673,23 @@ impl ForeignNetworkManager {
|
|||||||
Err(Error::RouteError(Some("network not found".to_string())))
|
Err(Error::RouteError(Some("network not found".to_string())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close_peer_conn(
|
||||||
|
&self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
conn_id: &super::peer_conn::PeerConnId,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let network_names = self.data.get_peer_network(peer_id).unwrap_or_default();
|
||||||
|
for network_name in network_names {
|
||||||
|
if let Some(entry) = self.data.get_network_entry(&network_name) {
|
||||||
|
let ret = entry.peer_map.close_peer_conn(peer_id, conn_id).await;
|
||||||
|
if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(Error::NotFound)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for ForeignNetworkManager {
|
impl Drop for ForeignNetworkManager {
|
||||||
|
|||||||
@@ -1128,6 +1128,35 @@ impl PeerManager {
|
|||||||
|
|
||||||
self.peer_rpc_mgr.rpc_server().registry().unregister_all();
|
self.peer_rpc_mgr.rpc_server().registry().unregister_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close_peer_conn(
|
||||||
|
&self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
conn_id: &PeerConnId,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let ret = self.peers.close_peer_conn(peer_id, conn_id).await;
|
||||||
|
tracing::info!("close_peer_conn in peer map: {:?}", ret);
|
||||||
|
if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ret = self
|
||||||
|
.foreign_network_client
|
||||||
|
.get_peer_map()
|
||||||
|
.close_peer_conn(peer_id, conn_id)
|
||||||
|
.await;
|
||||||
|
tracing::info!("close_peer_conn in foreign network client: {:?}", ret);
|
||||||
|
if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ret = self
|
||||||
|
.foreign_network_manager
|
||||||
|
.close_peer_conn(peer_id, conn_id)
|
||||||
|
.await;
|
||||||
|
tracing::info!("close_peer_conn in foreign network manager done: {:?}", ret);
|
||||||
|
ret
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -1147,7 +1176,10 @@ mod tests {
|
|||||||
peer_manager::RouteAlgoType,
|
peer_manager::RouteAlgoType,
|
||||||
peer_rpc::tests::register_service,
|
peer_rpc::tests::register_service,
|
||||||
route_trait::NextHopPolicy,
|
route_trait::NextHopPolicy,
|
||||||
tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost},
|
tests::{
|
||||||
|
connect_peer_manager, create_mock_peer_manager_with_name, wait_route_appear,
|
||||||
|
wait_route_appear_with_cost,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
proto::common::{CompressionAlgoPb, NatType, PeerFeatureFlag},
|
proto::common::{CompressionAlgoPb, NatType, PeerFeatureFlag},
|
||||||
tunnel::{
|
tunnel::{
|
||||||
@@ -1427,4 +1459,127 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn close_conn_in_peer_map() {
|
||||||
|
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||||
|
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||||
|
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
|
||||||
|
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let conns = peer_mgr_a
|
||||||
|
.get_peer_map()
|
||||||
|
.list_peer_conns(peer_mgr_b.my_peer_id)
|
||||||
|
.await;
|
||||||
|
assert!(conns.is_some());
|
||||||
|
let conn_info = conns.as_ref().unwrap().first().unwrap();
|
||||||
|
|
||||||
|
peer_mgr_a
|
||||||
|
.close_peer_conn(peer_mgr_b.my_peer_id, &conn_info.conn_id.parse().unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
let peers = peer_mgr_a.list_peers().await;
|
||||||
|
peers.is_empty()
|
||||||
|
},
|
||||||
|
Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// a is client, b is server
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn close_conn_in_foreign_network_client() {
|
||||||
|
let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await;
|
||||||
|
let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await;
|
||||||
|
connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await;
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
peer_mgr_client
|
||||||
|
.get_foreign_network_client()
|
||||||
|
.list_public_peers()
|
||||||
|
.await
|
||||||
|
.len()
|
||||||
|
== 1
|
||||||
|
},
|
||||||
|
Duration::from_secs(3),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let peer_id = peer_mgr_client
|
||||||
|
.foreign_network_client
|
||||||
|
.list_public_peers()
|
||||||
|
.await[0];
|
||||||
|
let conns = peer_mgr_client
|
||||||
|
.foreign_network_client
|
||||||
|
.get_peer_map()
|
||||||
|
.list_peer_conns(peer_id)
|
||||||
|
.await;
|
||||||
|
assert!(conns.is_some());
|
||||||
|
let conn_info = conns.as_ref().unwrap().first().unwrap();
|
||||||
|
peer_mgr_client
|
||||||
|
.close_peer_conn(peer_id, &conn_info.conn_id.parse().unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
peer_mgr_client
|
||||||
|
.get_foreign_network_client()
|
||||||
|
.list_public_peers()
|
||||||
|
.await
|
||||||
|
.len()
|
||||||
|
== 0
|
||||||
|
},
|
||||||
|
Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn close_conn_in_foreign_network_manager() {
|
||||||
|
let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await;
|
||||||
|
let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await;
|
||||||
|
connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await;
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
peer_mgr_client
|
||||||
|
.get_foreign_network_client()
|
||||||
|
.list_public_peers()
|
||||||
|
.await
|
||||||
|
.len()
|
||||||
|
== 1
|
||||||
|
},
|
||||||
|
Duration::from_secs(3),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let conns = peer_mgr_server
|
||||||
|
.foreign_network_manager
|
||||||
|
.list_foreign_networks()
|
||||||
|
.await;
|
||||||
|
let client_info = conns.foreign_networks["client"].peers[0].clone();
|
||||||
|
let conn_info = client_info.conns[0].clone();
|
||||||
|
peer_mgr_server
|
||||||
|
.close_peer_conn(client_info.peer_id, &conn_info.conn_id.parse().unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
peer_mgr_client
|
||||||
|
.get_foreign_network_client()
|
||||||
|
.list_public_peers()
|
||||||
|
.await
|
||||||
|
.len()
|
||||||
|
== 0
|
||||||
|
},
|
||||||
|
Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user