fix peer manager stuck when sending large peer rpc (#572)

This commit is contained in:
Sijie.Sun
2025-01-17 06:50:21 +08:00
committed by GitHub
parent c23b544c34
commit 1194ee1c2d
10 changed files with 132 additions and 36 deletions
+3 -3
View File
@@ -23,7 +23,7 @@ use crate::peer_center::instance::PeerCenterInstance;
use crate::peers::peer_conn::PeerConnId; use crate::peers::peer_conn::PeerConnId;
use crate::peers::peer_manager::{PeerManager, RouteAlgoType}; use crate::peers::peer_manager::{PeerManager, RouteAlgoType};
use crate::peers::rpc_service::PeerManagerRpcService; use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::PacketRecvChanReceiver; use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc; use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo}; use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::peer_rpc::PeerCenterRpcServer; use crate::proto::peer_rpc::PeerCenterRpcServer;
@@ -137,7 +137,7 @@ impl Instance {
global_ctx.config.dump() global_ctx.config.dump()
); );
let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100); let (peer_packet_sender, peer_packet_receiver) = create_packet_recv_chan();
let id = global_ctx.get_id(); let id = global_ctx.get_id();
@@ -230,7 +230,7 @@ impl Instance {
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
tasks.spawn(async move { tasks.spawn(async move {
let mut packet_recv = packet_recv.lock().await; let mut packet_recv = packet_recv.lock().await;
while let Some(packet) = packet_recv.recv().await { while let Ok(packet) = recv_packet_from_chan(&mut packet_recv).await {
tracing::trace!("packet consumed by mock nic ctx: {:?}", packet); tracing::trace!("packet consumed by mock nic ctx: {:?}", packet);
} }
}); });
+2 -2
View File
@@ -12,7 +12,7 @@ use crate::{
global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
ifcfg::{IfConfiger, IfConfiguerTrait}, ifcfg::{IfConfiger, IfConfiguerTrait},
}, },
peers::{peer_manager::PeerManager, PacketRecvChanReceiver}, peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver},
tunnel::{ tunnel::{
common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes}, common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes},
packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE}, packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE},
@@ -610,7 +610,7 @@ impl NicCtx {
self.tasks.spawn(async move { self.tasks.spawn(async move {
// unlock until coroutine finished // unlock until coroutine finished
let mut channel = channel.lock().await; let mut channel = channel.lock().await;
while let Some(packet) = channel.recv().await { while let Ok(packet) = recv_packet_from_chan(&mut channel).await {
tracing::trace!( tracing::trace!(
"[USER_PACKET] forward packet from peers to nic. packet: {:?}", "[USER_PACKET] forward packet from peers to nic. packet: {:?}",
packet packet
@@ -37,11 +37,13 @@ use crate::{
}; };
use super::{ use super::{
create_packet_recv_chan,
peer_conn::PeerConn, peer_conn::PeerConn,
peer_map::PeerMap, peer_map::PeerMap,
peer_ospf_route::PeerRoute, peer_ospf_route::PeerRoute,
peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport},
peer_rpc_service::DirectConnectorManagerRpcServer, peer_rpc_service::DirectConnectorManagerRpcServer,
recv_packet_from_chan,
route_trait::NextHopPolicy, route_trait::NextHopPolicy,
PacketRecvChan, PacketRecvChanReceiver, PacketRecvChan, PacketRecvChanReceiver,
}; };
@@ -79,7 +81,7 @@ impl ForeignNetworkEntry {
) -> Self { ) -> Self {
let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone()); let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone());
let (packet_sender, packet_recv) = mpsc::channel(64); let (packet_sender, packet_recv) = create_packet_recv_chan();
let peer_map = Arc::new(PeerMap::new( let peer_map = Arc::new(PeerMap::new(
packet_sender, packet_sender,
@@ -251,7 +253,7 @@ impl ForeignNetworkEntry {
let network_name = self.network.network_name.clone(); let network_name = self.network.network_name.clone();
self.tasks.lock().await.spawn(async move { self.tasks.lock().await.spawn(async move {
while let Some(zc_packet) = recv.recv().await { while let Ok(zc_packet) = recv_packet_from_chan(&mut recv).await {
let Some(hdr) = zc_packet.peer_manager_header() else { let Some(hdr) = zc_packet.peer_manager_header() else {
tracing::warn!("invalid packet, skip"); tracing::warn!("invalid packet, skip");
continue; continue;
@@ -622,7 +624,7 @@ mod tests {
network: &str, network: &str,
secret: &str, secret: &str,
) -> Arc<PeerManager> { ) -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000); let (s, _r) = create_packet_recv_chan();
let peer_mgr = Arc::new(PeerManager::new( let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Ospf, RouteAlgoType::Ospf,
get_mock_global_ctx_with_network(Some(NetworkIdentity::new( get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
+16
View File
@@ -39,5 +39,21 @@ pub trait NicPacketFilter {
type BoxPeerPacketFilter = Box<dyn PeerPacketFilter + Send + Sync>; type BoxPeerPacketFilter = Box<dyn PeerPacketFilter + Send + Sync>;
type BoxNicPacketFilter = Box<dyn NicPacketFilter + Send + Sync>; type BoxNicPacketFilter = Box<dyn NicPacketFilter + Send + Sync>;
// pub type PacketRecvChan = tachyonix::Sender<ZCPacket>;
// pub type PacketRecvChanReceiver = tachyonix::Receiver<ZCPacket>;
// pub fn create_packet_recv_chan() -> (PacketRecvChan, PacketRecvChanReceiver) {
// tachyonix::channel(128)
// }
pub type PacketRecvChan = tokio::sync::mpsc::Sender<ZCPacket>; pub type PacketRecvChan = tokio::sync::mpsc::Sender<ZCPacket>;
pub type PacketRecvChanReceiver = tokio::sync::mpsc::Receiver<ZCPacket>; pub type PacketRecvChanReceiver = tokio::sync::mpsc::Receiver<ZCPacket>;
pub fn create_packet_recv_chan() -> (PacketRecvChan, PacketRecvChanReceiver) {
tokio::sync::mpsc::channel(128)
}
pub async fn recv_packet_from_chan(
packet_recv_chan_receiver: &mut PacketRecvChanReceiver,
) -> Result<ZCPacket, anyhow::Error> {
packet_recv_chan_receiver
.recv()
.await
.ok_or(anyhow::anyhow!("recv_packet_from_chan failed"))
}
+4 -4
View File
@@ -171,11 +171,11 @@ impl Drop for Peer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use tokio::{sync::mpsc, time::timeout}; use tokio::time::timeout;
use crate::{ use crate::{
common::{global_ctx::tests::get_mock_global_ctx, new_peer_id}, common::{global_ctx::tests::get_mock_global_ctx, new_peer_id},
peers::peer_conn::PeerConn, peers::{create_packet_recv_chan, peer_conn::PeerConn},
tunnel::ring::create_ring_tunnel_pair, tunnel::ring::create_ring_tunnel_pair,
}; };
@@ -183,8 +183,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn close_peer() { async fn close_peer() {
let (local_packet_send, _local_packet_recv) = mpsc::channel(10); let (local_packet_send, _local_packet_recv) = create_packet_recv_chan();
let (remote_packet_send, _remote_packet_recv) = mpsc::channel(10); let (remote_packet_send, _remote_packet_recv) = create_packet_recv_chan();
let global_ctx = get_mock_global_ctx(); let global_ctx = get_mock_global_ctx();
let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone()); let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone());
let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone()); let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone());
+3 -6
View File
@@ -413,6 +413,7 @@ mod tests {
use crate::common::global_ctx::tests::get_mock_global_ctx; use crate::common::global_ctx::tests::get_mock_global_ctx;
use crate::common::new_peer_id; use crate::common::new_peer_id;
use crate::common::scoped_task::ScopedTask; use crate::common::scoped_task::ScopedTask;
use crate::peers::create_packet_recv_chan;
use crate::tunnel::filter::tests::DropSendTunnelFilter; use crate::tunnel::filter::tests::DropSendTunnelFilter;
use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::filter::PacketRecorderTunnelFilter;
use crate::tunnel::ring::create_ring_tunnel_pair; use crate::tunnel::ring::create_ring_tunnel_pair;
@@ -496,9 +497,7 @@ mod tests {
); );
s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0); s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0);
s_peer s_peer.start_recv_loop(create_packet_recv_chan().0).await;
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
.await;
// do not start ping for s, s only reponde to ping from c // do not start ping for s, s only reponde to ping from c
assert!(c_ret.is_ok()); assert!(c_ret.is_ok());
@@ -507,9 +506,7 @@ mod tests {
let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1); let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1);
c_peer.set_close_event_sender(close_send); c_peer.set_close_event_sender(close_send);
c_peer.start_pingpong(); c_peer.start_pingpong();
c_peer c_peer.start_recv_loop(create_packet_recv_chan().0).await;
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
.await;
let throughput = c_peer.throughput.clone(); let throughput = c_peer.throughput.clone();
let _t = ScopedTask::from(tokio::spawn(async move { let _t = ScopedTask::from(tokio::spawn(async move {
+12 -9
View File
@@ -30,6 +30,7 @@ use crate::{
peers::{ peers::{
peer_conn::PeerConn, peer_conn::PeerConn,
peer_rpc::PeerRpcManagerTransport, peer_rpc::PeerRpcManagerTransport,
recv_packet_from_chan,
route_trait::{ForeignNetworkRouteInfoMap, NextHopPolicy, RouteInterface}, route_trait::{ForeignNetworkRouteInfoMap, NextHopPolicy, RouteInterface},
PeerPacketFilter, PeerPacketFilter,
}, },
@@ -43,11 +44,12 @@ use crate::{
tunnel::{ tunnel::{
self, self,
packet_def::{CompressorAlgo, PacketType, ZCPacket}, packet_def::{CompressorAlgo, PacketType, ZCPacket},
SinkItem, Tunnel, TunnelConnector, Tunnel, TunnelConnector,
}, },
}; };
use super::{ use super::{
create_packet_recv_chan,
encrypt::{Encryptor, NullCipher}, encrypt::{Encryptor, NullCipher},
foreign_network_client::ForeignNetworkClient, foreign_network_client::ForeignNetworkClient,
foreign_network_manager::{ForeignNetworkManager, GlobalForeignNetworkAccessor}, foreign_network_manager::{ForeignNetworkManager, GlobalForeignNetworkAccessor},
@@ -56,7 +58,7 @@ use super::{
peer_ospf_route::PeerRoute, peer_ospf_route::PeerRoute,
peer_rpc::PeerRpcManager, peer_rpc::PeerRpcManager,
route_trait::{ArcRoute, Route}, route_trait::{ArcRoute, Route},
BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChanReceiver, BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver,
}; };
struct RpcTransport { struct RpcTransport {
@@ -116,7 +118,7 @@ pub struct PeerManager {
my_peer_id: PeerId, my_peer_id: PeerId,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
nic_channel: mpsc::Sender<SinkItem>, nic_channel: PacketRecvChan,
tasks: Arc<Mutex<JoinSet<()>>>, tasks: Arc<Mutex<JoinSet<()>>>,
@@ -155,11 +157,11 @@ impl PeerManager {
pub fn new( pub fn new(
route_algo: RouteAlgoType, route_algo: RouteAlgoType,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
nic_channel: mpsc::Sender<SinkItem>, nic_channel: PacketRecvChan,
) -> Self { ) -> Self {
let my_peer_id = rand::random(); let my_peer_id = rand::random();
let (packet_send, packet_recv) = mpsc::channel(128); let (packet_send, packet_recv) = create_packet_recv_chan();
let peers = Arc::new(PeerMap::new( let peers = Arc::new(PeerMap::new(
packet_send.clone(), packet_send.clone(),
global_ctx.clone(), global_ctx.clone(),
@@ -417,7 +419,7 @@ impl PeerManager {
let encryptor = self.encryptor.clone(); let encryptor = self.encryptor.clone();
self.tasks.lock().await.spawn(async move { self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv"); tracing::trace!("start_peer_recv");
while let Some(ret) = recv.recv().await { while let Ok(ret) = recv_packet_from_chan(&mut recv).await {
let Err(mut ret) = let Err(mut ret) =
Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr) Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr)
.await .await
@@ -505,7 +507,7 @@ impl PeerManager {
async fn init_packet_process_pipeline(&self) { async fn init_packet_process_pipeline(&self) {
// for tun/tap ip/eth packet. // for tun/tap ip/eth packet.
struct NicPacketProcessor { struct NicPacketProcessor {
nic_channel: mpsc::Sender<SinkItem>, nic_channel: PacketRecvChan,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl PeerPacketFilter for NicPacketProcessor { impl PeerPacketFilter for NicPacketProcessor {
@@ -875,7 +877,7 @@ impl PeerManager {
self.global_ctx.clone() self.global_ctx.clone()
} }
pub fn get_nic_channel(&self) -> mpsc::Sender<SinkItem> { pub fn get_nic_channel(&self) -> PacketRecvChan {
self.nic_channel.clone() self.nic_channel.clone()
} }
@@ -935,6 +937,7 @@ mod tests {
}, },
instance::listeners::get_listener_by_url, instance::listeners::get_listener_by_url,
peers::{ peers::{
create_packet_recv_chan,
peer_manager::RouteAlgoType, peer_manager::RouteAlgoType,
peer_rpc::tests::register_service, peer_rpc::tests::register_service,
route_trait::NextHopPolicy, route_trait::NextHopPolicy,
@@ -1078,7 +1081,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn communicate_between_enc_and_non_enc() { async fn communicate_between_enc_and_non_enc() {
let create_mgr = |enable_encryption| async move { let create_mgr = |enable_encryption| async move {
let (s, _r) = tokio::sync::mpsc::channel(1000); let (s, _r) = create_packet_recv_chan();
let mock_global_ctx = get_mock_global_ctx(); let mock_global_ctx = get_mock_global_ctx();
mock_global_ctx.config.set_flags(Flags { mock_global_ctx.config.set_flags(Flags {
enable_encryption, enable_encryption,
+11 -5
View File
@@ -70,11 +70,17 @@ impl PeerMap {
pub async fn send_msg_directly(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { pub async fn send_msg_directly(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
if dst_peer_id == self.my_peer_id { if dst_peer_id == self.my_peer_id {
return Ok(self let packet_send = self.packet_send.clone();
.packet_send tokio::spawn(async move {
.send(msg) let ret = packet_send
.await .send(msg)
.with_context(|| "send msg to self failed")?); .await
.with_context(|| "send msg to self failed");
if ret.is_err() {
tracing::error!("send msg to self failed: {:?}", ret);
}
});
return Ok(());
} }
match self.get_peer_by_id(dst_peer_id) { match self.get_peer_by_id(dst_peer_id) {
+2 -1
View File
@@ -2117,6 +2117,7 @@ mod tests {
common::{global_ctx::tests::get_mock_global_ctx, PeerId}, common::{global_ctx::tests::get_mock_global_ctx, PeerId},
connector::udp_hole_punch::tests::replace_stun_info_collector, connector::udp_hole_punch::tests::replace_stun_info_collector,
peers::{ peers::{
create_packet_recv_chan,
peer_manager::{PeerManager, RouteAlgoType}, peer_manager::{PeerManager, RouteAlgoType},
route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface},
tests::connect_peer_manager, tests::connect_peer_manager,
@@ -2154,7 +2155,7 @@ mod tests {
} }
async fn create_mock_pmgr() -> Arc<PeerManager> { async fn create_mock_pmgr() -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000); let (s, _r) = create_packet_recv_chan();
let peer_mgr = Arc::new(PeerManager::new( let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::None, RouteAlgoType::None,
get_mock_global_ctx(), get_mock_global_ctx(),
+74 -3
View File
@@ -1,14 +1,24 @@
use std::sync::Arc; use std::sync::Arc;
use crate::{ use crate::{
common::{error::Error, global_ctx::tests::get_mock_global_ctx, PeerId}, common::{
error::Error,
global_ctx::{
tests::{get_mock_global_ctx, get_mock_global_ctx_with_network},
NetworkIdentity,
},
PeerId,
},
tunnel::ring::create_ring_tunnel_pair, tunnel::ring::create_ring_tunnel_pair,
}; };
use super::peer_manager::{PeerManager, RouteAlgoType}; use super::{
create_packet_recv_chan,
peer_manager::{PeerManager, RouteAlgoType},
};
pub async fn create_mock_peer_manager() -> Arc<PeerManager> { pub async fn create_mock_peer_manager() -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000); let (s, _r) = create_packet_recv_chan();
let peer_mgr = Arc::new(PeerManager::new( let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Ospf, RouteAlgoType::Ospf,
get_mock_global_ctx(), get_mock_global_ctx(),
@@ -18,6 +28,15 @@ pub async fn create_mock_peer_manager() -> Arc<PeerManager> {
peer_mgr peer_mgr
} }
pub async fn create_mock_peer_manager_with_name(network_name: String) -> Arc<PeerManager> {
let (s, _r) = create_packet_recv_chan();
let g =
get_mock_global_ctx_with_network(Some(NetworkIdentity::new(network_name, "".to_string())));
let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, g, s));
peer_mgr.run().await.unwrap();
peer_mgr
}
pub async fn connect_peer_manager(client: Arc<PeerManager>, server: Arc<PeerManager>) { pub async fn connect_peer_manager(client: Arc<PeerManager>, server: Arc<PeerManager>) {
let (a_ring, b_ring) = create_ring_tunnel_pair(); let (a_ring, b_ring) = create_ring_tunnel_pair();
let a_mgr_copy = client.clone(); let a_mgr_copy = client.clone();
@@ -56,3 +75,55 @@ pub async fn wait_route_appear(
wait_route_appear_with_cost(peer_mgr.clone(), target_peer.my_peer_id(), None).await?; wait_route_appear_with_cost(peer_mgr.clone(), target_peer.my_peer_id(), None).await?;
wait_route_appear_with_cost(target_peer, peer_mgr.my_peer_id(), None).await wait_route_appear_with_cost(target_peer, peer_mgr.my_peer_id(), None).await
} }
#[tokio::test]
async fn foreign_mgr_stress_test() {
const FOREIGN_NETWORK_COUNT: i32 = 20;
const PEER_PER_NETWORK: i32 = 3;
const PUBLIC_PEER_COUNT: i32 = 3;
let mut public_peers = Vec::new();
for _ in 0..PUBLIC_PEER_COUNT {
public_peers.push(create_mock_peer_manager().await);
}
connect_peer_manager(public_peers[0].clone(), public_peers[1].clone()).await;
connect_peer_manager(public_peers[0].clone(), public_peers[2].clone()).await;
connect_peer_manager(public_peers[1].clone(), public_peers[2].clone()).await;
let mut foreigns = Vec::new();
for i in 0..FOREIGN_NETWORK_COUNT {
let mut peers = Vec::new();
let name = format!("foreign-network-test-{}", i);
for _ in 0..PEER_PER_NETWORK {
let mgr = create_mock_peer_manager_with_name(name.clone()).await;
let public_peer_idx = rand::random::<usize>() % public_peers.len();
connect_peer_manager(mgr.clone(), public_peers[public_peer_idx].clone()).await;
peers.push(mgr);
}
foreigns.push(peers);
}
for _ in 0..5 {
for i in 0..PUBLIC_PEER_COUNT {
let p = public_peers[i as usize].clone();
println!(
"public peer {} routes: {:?}, global_foreign_network: {:?}, peers: {:?}",
i,
p.list_routes().await,
p.list_global_foreign_network().await.foreign_networks.len(),
p.get_peer_map().list_peers().await
);
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let new_peer = create_mock_peer_manager().await;
connect_peer_manager(new_peer.clone(), public_peers[0].clone()).await;
while let Err(e) = wait_route_appear(public_peers[1].clone(), new_peer.clone()).await {
println!("wait route ret: {:?}", e);
}
}
}