support forward foreign network packet between peers

This commit is contained in:
sijie.sun
2024-09-27 21:42:11 +08:00
committed by Sijie.Sun
parent a50bcf3087
commit ff5ee8a05e
11 changed files with 776 additions and 128 deletions
+368 -69
View File
@@ -42,10 +42,16 @@ use super::{
peer_ospf_route::PeerRoute,
peer_rpc::{PeerRpcManager, PeerRpcManagerTransport},
peer_rpc_service::DirectConnectorManagerRpcServer,
route_trait::{ArcRoute, NextHopPolicy},
route_trait::NextHopPolicy,
PacketRecvChan, PacketRecvChanReceiver,
};
#[async_trait::async_trait]
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait GlobalForeignNetworkAccessor: Send + Sync + 'static {
async fn list_global_foreign_peer(&self, network_identity: &NetworkIdentity) -> Vec<PeerId>;
}
struct ForeignNetworkEntry {
my_peer_id: PeerId,
@@ -53,10 +59,9 @@ struct ForeignNetworkEntry {
network: NetworkIdentity,
peer_map: Arc<PeerMap>,
relay_data: bool,
pm_packet_sender: Mutex<Option<PacketRecvChan>>,
route: ArcRoute,
peer_rpc: Weak<PeerRpcManager>,
peer_rpc: Arc<PeerRpcManager>,
rpc_sender: UnboundedSender<ZCPacket>,
packet_recv: Mutex<Option<PacketRecvChanReceiver>>,
@@ -70,10 +75,11 @@ impl ForeignNetworkEntry {
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
relay_data: bool,
pm_packet_sender: PacketRecvChan,
) -> Self {
let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone());
let (packet_sender, packet_recv) = mpsc::channel(1000);
let (packet_sender, packet_recv) = mpsc::channel(64);
let peer_map = Arc::new(PeerMap::new(
packet_sender,
@@ -83,7 +89,6 @@ impl ForeignNetworkEntry {
let (peer_rpc, rpc_transport_sender) = Self::build_rpc_tspt(my_peer_id, peer_map.clone());
let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone());
peer_rpc.rpc_server().registry().register(
DirectConnectorRpcServer::new(DirectConnectorManagerRpcServer::new(
foreign_global_ctx.clone(),
@@ -98,9 +103,9 @@ impl ForeignNetworkEntry {
network,
peer_map,
relay_data,
route: Arc::new(Box::new(route)),
pm_packet_sender: Mutex::new(Some(pm_packet_sender)),
peer_rpc: Arc::downgrade(&peer_rpc),
peer_rpc,
rpc_sender: rpc_transport_sender,
packet_recv: Mutex::new(Some(packet_recv)),
@@ -160,9 +165,8 @@ impl ForeignNetworkEntry {
.upgrade()
.ok_or(anyhow::anyhow!("peer map is gone"))?;
peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
// send to ourselves so we can handle it in forward logic.
peer_map.send_msg_directly(msg, self.my_peer_id).await
}
async fn recv(&self) -> Result<ZCPacket, Error> {
@@ -186,10 +190,16 @@ impl ForeignNetworkEntry {
(peer_rpc, rpc_transport_sender)
}
async fn prepare_route(&self, my_peer_id: PeerId) {
async fn prepare_route(
&self,
my_peer_id: PeerId,
accessor: Box<dyn GlobalForeignNetworkAccessor>,
) {
struct Interface {
my_peer_id: PeerId,
peer_map: Weak<PeerMap>,
network_identity: NetworkIdentity,
accessor: Box<dyn GlobalForeignNetworkAccessor>,
}
#[async_trait::async_trait]
@@ -199,7 +209,17 @@ impl ForeignNetworkEntry {
return vec![];
};
peer_map.list_peers_with_conn().await
let mut global = self
.accessor
.list_global_foreign_peer(&self.network_identity)
.await;
let local = peer_map.list_peers_with_conn().await;
tracing::debug!(?global, ?local, ?self.my_peer_id, "list peers in foreign network manager");
global.extend(local.iter().cloned());
global
.into_iter()
.filter(|x| *x != self.my_peer_id)
.collect()
}
fn my_peer_id(&self) -> PeerId {
@@ -207,15 +227,18 @@ impl ForeignNetworkEntry {
}
}
self.route
let route = PeerRoute::new(my_peer_id, self.global_ctx.clone(), self.peer_rpc.clone());
route
.open(Box::new(Interface {
my_peer_id,
network_identity: self.network.clone(),
peer_map: Arc::downgrade(&self.peer_map),
accessor,
}))
.await
.unwrap();
self.peer_map.add_route(self.route.clone()).await;
self.peer_map.add_route(Arc::new(Box::new(route))).await;
}
async fn start_packet_recv(&self) {
@@ -224,10 +247,12 @@ impl ForeignNetworkEntry {
let rpc_sender = self.rpc_sender.clone();
let peer_map = self.peer_map.clone();
let relay_data = self.relay_data;
let pm_sender = self.pm_packet_sender.lock().await.take().unwrap();
let network_name = self.network.network_name.clone();
self.tasks.lock().await.spawn(async move {
while let Some(packet_bytes) = recv.recv().await {
let Some(hdr) = packet_bytes.peer_manager_header() else {
while let Some(zc_packet) = recv.recv().await {
let Some(hdr) = zc_packet.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
};
@@ -238,7 +263,7 @@ impl ForeignNetworkEntry {
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
{
rpc_sender.send(packet_bytes).unwrap();
rpc_sender.send(zc_packet).unwrap();
continue;
}
tracing::trace!(?hdr, "ignore packet in foreign network");
@@ -247,32 +272,55 @@ impl ForeignNetworkEntry {
continue;
}
let ret = peer_map
.send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop)
let gateway_peer_id = peer_map
.get_gateway_peer_id(to_peer_id, NextHopPolicy::LeastHop)
.await;
if ret.is_err() {
tracing::error!("forward packet to peer failed: {:?}", ret.err());
if gateway_peer_id.is_some() && peer_map.has_peer(gateway_peer_id.unwrap()) {
if let Err(e) = peer_map
.send_msg_directly(zc_packet, gateway_peer_id.unwrap())
.await
{
tracing::error!(
?e,
"send packet to foreign peer inside peer map failed"
);
}
} else {
let mut foreign_packet = ZCPacket::new_for_foreign_network(
&network_name,
to_peer_id,
&zc_packet,
);
foreign_packet.fill_peer_manager_hdr(
my_node_id,
gateway_peer_id.unwrap_or(to_peer_id),
PacketType::ForeignNetworkPacket as u8,
);
if let Err(e) = pm_sender.send(foreign_packet).await {
tracing::error!("send packet to peer with pm failed: {:?}", e);
}
}
}
}
});
}
async fn prepare(&self, my_peer_id: PeerId) {
self.prepare_route(my_peer_id).await;
async fn prepare(&self, my_peer_id: PeerId, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
self.prepare_route(my_peer_id, accessor).await;
self.start_packet_recv().await;
self.peer_rpc.upgrade().unwrap().run();
self.peer_rpc.run();
}
}
impl Drop for ForeignNetworkEntry {
fn drop(&mut self) {
if let Some(peer_rpc) = self.peer_rpc.upgrade() {
peer_rpc
.rpc_server()
.registry()
.unregister_by_domain(&self.network.network_name);
}
self.peer_rpc
.rpc_server()
.registry()
.unregister_by_domain(&self.network.network_name);
tracing::debug!(self.my_peer_id, ?self.network, "drop foreign network entry");
}
}
@@ -280,6 +328,7 @@ struct ForeignNetworkManagerData {
network_peer_maps: DashMap<String, Arc<ForeignNetworkEntry>>,
peer_network_map: DashMap<PeerId, String>,
network_peer_last_update: DashMap<String, SystemTime>,
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
lock: std::sync::Mutex<()>,
}
@@ -319,6 +368,50 @@ impl ForeignNetworkManagerData {
self.network_peer_maps.remove(network_name);
self.network_peer_last_update.remove(network_name);
}
async fn get_or_insert_entry(
&self,
network_identity: &NetworkIdentity,
my_peer_id: PeerId,
dst_peer_id: PeerId,
relay_data: bool,
global_ctx: &ArcGlobalCtx,
pm_packet_sender: &PacketRecvChan,
) -> (Arc<ForeignNetworkEntry>, bool) {
let mut new_added = false;
let l = self.lock.lock().unwrap();
let entry = self
.network_peer_maps
.entry(network_identity.network_name.clone())
.or_insert_with(|| {
new_added = true;
Arc::new(ForeignNetworkEntry::new(
network_identity.clone(),
global_ctx.clone(),
my_peer_id,
relay_data,
pm_packet_sender.clone(),
))
})
.clone();
self.peer_network_map
.insert(dst_peer_id, network_identity.network_name.clone());
self.network_peer_last_update
.insert(network_identity.network_name.clone(), SystemTime::now());
drop(l);
if new_added {
entry
.prepare(my_peer_id, Box::new(self.accessor.clone()))
.await;
}
(entry, new_added)
}
}
pub const FOREIGN_NETWORK_SERVICE_ID: u32 = 1;
@@ -338,11 +431,13 @@ impl ForeignNetworkManager {
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: PacketRecvChan,
accessor: Box<dyn GlobalForeignNetworkAccessor>,
) -> Self {
let data = Arc::new(ForeignNetworkManagerData {
network_peer_maps: DashMap::new(),
peer_network_map: DashMap::new(),
network_peer_last_update: DashMap::new(),
accessor: Arc::new(accessor),
lock: std::sync::Mutex::new(()),
});
@@ -381,44 +476,23 @@ impl ForeignNetworkManager {
return ret;
}
let mut new_added = false;
let entry = {
let _l = self.data.lock.lock().unwrap();
let entry = self
.data
.network_peer_maps
.entry(peer_conn.get_network_identity().network_name.clone())
.or_insert_with(|| {
new_added = true;
Arc::new(ForeignNetworkEntry::new(
peer_conn.get_network_identity(),
self.global_ctx.clone(),
self.my_peer_id,
!ret.is_err(),
))
})
.clone();
self.data.peer_network_map.insert(
let (entry, new_added) = self
.data
.get_or_insert_entry(
&peer_conn.get_network_identity(),
self.my_peer_id,
peer_conn.get_peer_id(),
peer_conn.get_network_identity().network_name.clone(),
);
self.data.network_peer_last_update.insert(
peer_conn.get_network_identity().network_name.clone(),
SystemTime::now(),
);
entry
};
if new_added {
entry.prepare(self.my_peer_id).await;
self.start_event_handler(&entry).await;
}
!ret.is_err(),
&self.global_ctx,
&self.packet_sender_to_mgr,
)
.await;
if entry.network != peer_conn.get_network_identity() {
if new_added {
self.data
.remove_network(&entry.network.network_name.clone());
}
return Err(anyhow::anyhow!(
"network secret not match. exp: {:?} real: {:?}",
entry.network,
@@ -427,6 +501,10 @@ impl ForeignNetworkManager {
.into());
}
if new_added {
self.start_event_handler(&entry).await;
}
Ok(entry.peer_map.add_new_peer_conn(peer_conn).await)
}
@@ -480,7 +558,14 @@ impl ForeignNetworkManager {
continue;
};
let mut entry = ForeignNetworkEntryPb::default();
let mut entry = ForeignNetworkEntryPb {
network_secret_digest: item
.network
.network_secret_digest
.unwrap_or_default()
.to_vec(),
..Default::default()
};
for peer in item.peer_map.list_peers().await {
let mut peer_info = PeerInfo::default();
peer_info.peer_id = peer;
@@ -499,6 +584,22 @@ impl ForeignNetworkManager {
.get(network_name)
.map(|v| v.clone())
}
pub async fn send_msg_to_peer(
&self,
network_name: &str,
dst_peer_id: PeerId,
msg: ZCPacket,
) -> Result<(), Error> {
if let Some(entry) = self.data.get_network_entry(network_name) {
entry
.peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
} else {
Err(Error::RouteError(Some("network not found".to_string())))
}
}
}
impl Drop for ForeignNetworkManager {
@@ -522,18 +623,22 @@ mod tests {
tests::{connect_peer_manager, wait_route_appear},
},
proto::common::NatType,
set_global_var,
tunnel::common::tests::wait_for_condition,
};
use super::*;
async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
async fn create_mock_peer_manager_for_foreign_network_ext(
network: &str,
secret: &str,
) -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000);
let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Ospf,
get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
network.to_string(),
network.to_string(),
secret.to_string(),
))),
s,
));
@@ -542,6 +647,10 @@ mod tests {
peer_mgr
}
async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
create_mock_peer_manager_for_foreign_network_ext(network, network).await
}
#[tokio::test]
async fn foreign_network_basic() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
@@ -780,4 +889,194 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_foreign_network_manager_cluster() {
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await;
connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await;
tracing::debug!(
"pm_center: {:?}, pm_center2: {:?}",
pm_center1.my_peer_id(),
pm_center2.my_peer_id()
);
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await;
connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await;
tracing::debug!(
"pma_net1: {:?}, pmb_net1: {:?}",
pma_net1.my_peer_id(),
pmb_net1.my_peer_id()
);
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
.await
.unwrap();
assert_eq!(3, pma_net1.list_routes().await.len(),);
let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pmc_net1.clone(), pm_center3.clone()).await;
wait_route_appear(pma_net1.clone(), pmc_net1.clone())
.await
.unwrap();
assert_eq!(5, pma_net1.list_routes().await.len(),);
println!(
"pm_center1: {:?}, pm_center2: {:?}, pm_center3: {:?}",
pm_center1.my_peer_id(),
pm_center2.my_peer_id(),
pm_center3.my_peer_id()
);
println!(
"pma_net1: {:?}, pmb_net1: {:?}, pmc_net1: {:?}",
pma_net1.my_peer_id(),
pmb_net1.my_peer_id(),
pmc_net1.my_peer_id()
);
println!("drop pmc_net1, id: {:?}", pmc_net1.my_peer_id());
// foreign network node disconnect
drop(pmc_net1);
wait_for_condition(
|| async { pma_net1.list_routes().await.len() == 3 },
Duration::from_secs(15),
)
.await;
println!("drop pm_center1, id: {:?}", pm_center1.my_peer_id());
drop(pm_center1);
wait_for_condition(
|| async { pma_net1.list_routes().await.len() == 0 },
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| async {
let n = pmb_net1
.get_route()
.get_next_hop(pma_net1.my_peer_id())
.await;
n.is_none()
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| async {
// only remain pmb center
pmb_net1.list_routes().await.len() == 1
},
Duration::from_secs(15),
)
.await;
}
#[tokio::test]
async fn test_foreign_network_manager_cluster_multi_net() {
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await;
connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await;
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await;
connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await;
let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
connect_peer_manager(pma_net2.clone(), pm_center2.clone()).await;
connect_peer_manager(pmb_net2.clone(), pm_center3.clone()).await;
let pma_net3 = create_mock_peer_manager_for_foreign_network("net3").await;
let pmb_net3 = create_mock_peer_manager_for_foreign_network("net3").await;
connect_peer_manager(pma_net3.clone(), pm_center1.clone()).await;
connect_peer_manager(pmb_net3.clone(), pm_center3.clone()).await;
let pma_net4 = create_mock_peer_manager_for_foreign_network("net4").await;
let pmb_net4 = create_mock_peer_manager_for_foreign_network("net4").await;
let pmc_net4 = create_mock_peer_manager_for_foreign_network("net4").await;
connect_peer_manager(pma_net4.clone(), pm_center1.clone()).await;
connect_peer_manager(pmb_net4.clone(), pm_center2.clone()).await;
connect_peer_manager(pmc_net4.clone(), pm_center3.clone()).await;
tokio::time::sleep(Duration::from_secs(5)).await;
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
.await
.unwrap();
wait_route_appear(pma_net2.clone(), pmb_net2.clone())
.await
.unwrap();
wait_route_appear(pma_net3.clone(), pmb_net3.clone())
.await
.unwrap();
wait_route_appear(pma_net4.clone(), pmb_net4.clone())
.await
.unwrap();
wait_route_appear(pma_net4.clone(), pmc_net4.clone())
.await
.unwrap();
wait_route_appear(pmb_net4.clone(), pmc_net4.clone())
.await
.unwrap();
assert_eq!(3, pma_net1.list_routes().await.len());
assert_eq!(3, pmb_net1.list_routes().await.len());
assert_eq!(3, pma_net2.list_routes().await.len());
assert_eq!(3, pmb_net2.list_routes().await.len());
assert_eq!(3, pma_net3.list_routes().await.len());
assert_eq!(3, pmb_net3.list_routes().await.len());
assert_eq!(5, pma_net4.list_routes().await.len());
assert_eq!(5, pmb_net4.list_routes().await.len());
assert_eq!(5, pmc_net4.list_routes().await.len());
drop(pm_center3);
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(1, pma_net2.list_routes().await.len());
assert_eq!(1, pma_net3.list_routes().await.len());
assert_eq!(3, pma_net4.list_routes().await.len());
}
#[tokio::test]
async fn test_foreign_network_manager_cluster_secret_mismatch() {
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await;
connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await;
let pma_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "1").await;
let pmb_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "2").await;
let pmc_net4 = create_mock_peer_manager_for_foreign_network_ext("net4", "3").await;
connect_peer_manager(pma_net4.clone(), pm_center1.clone()).await;
connect_peer_manager(pmb_net4.clone(), pm_center2.clone()).await;
connect_peer_manager(pmc_net4.clone(), pm_center3.clone()).await;
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(1, pma_net4.list_routes().await.len());
assert_eq!(1, pmb_net4.list_routes().await.len());
assert_eq!(1, pmc_net4.list_routes().await.len());
}
}