use anyhow::Context; use async_trait::async_trait; use cidr::{Ipv4Cidr, Ipv6Cidr}; use dashmap::DashMap; use std::collections::BTreeSet; use std::{ fmt::Debug, net::{IpAddr, Ipv4Addr, Ipv6Addr}, sync::{Arc, Weak, atomic::AtomicBool}, time::{Duration, Instant, SystemTime}, }; use tokio::{ sync::{ Mutex, RwLock, mpsc::{self, UnboundedReceiver, UnboundedSender}, }, task::JoinSet, }; use crate::{ common::{ PeerId, compressor::{Compressor as _, DefaultCompressor}, constants::EASYTIER_VERSION, error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, shrink_dashmap, stats_manager::{CounterHandle, LabelSet, LabelType, MetricName}, stun::StunInfoCollectorTrait, }, peers::{ PeerPacketFilter, peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, peer_session::PeerSessionStore, recv_packet_from_chan, route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface}, traffic_metrics::{ InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder, is_relay_data_packet_type, route_peer_info_instance_id, traffic_kind, }, }, proto::{ api::instance::{ self, ListGlobalForeignNetworkResponse, list_global_foreign_network_response::OneForeignNetwork, }, peer_rpc::{ ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, PeerIdentityType, RouteForeignNetworkSummary, }, }, tunnel::{ self, Tunnel, TunnelConnector, packet_def::{CompressorAlgo, PacketType, ZCPacket}, }, }; use super::{ BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver, create_packet_recv_chan, encrypt::{Encryptor, NullCipher}, foreign_network_client::ForeignNetworkClient, foreign_network_manager::{ForeignNetworkManager, GlobalForeignNetworkAccessor}, peer_conn::PeerConnId, peer_map::PeerMap, peer_ospf_route::PeerRoute, peer_rpc::PeerRpcManager, peer_task::ExternalTaskSignal, relay_peer_map::RelayPeerMap, route_trait::{ArcRoute, Route}, }; struct RpcTransport { my_peer_id: PeerId, peers: Weak, // TODO: this seems can be removed foreign_peers: Mutex>>, packet_recv: Mutex>, peer_rpc_tspt_sender: UnboundedSender, encryptor: Arc, is_secure_mode_enabled: bool, } #[async_trait::async_trait] impl PeerRpcManagerTransport for RpcTransport { fn my_peer_id(&self) -> PeerId { self.my_peer_id } async fn send(&self, mut msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { let peers = self.peers.upgrade().ok_or(Error::Unknown)?; // NOTE: if route info is not exchanged, this will return None. treat it as public server. let is_dst_peer_public_server = peers .get_route_peer_info(dst_peer_id) .await .and_then(|x| x.feature_flag.map(|x| x.is_public_server)) // if dst is directly connected, it's must not public server .unwrap_or(!peers.has_peer(dst_peer_id)); if !is_dst_peer_public_server && !self.is_secure_mode_enabled { self.encryptor .encrypt(&mut msg) .with_context(|| "encrypt failed")?; } // send to self and this packet will be forwarded in peer_recv loop peers.send_msg_directly(msg, self.my_peer_id).await } async fn recv(&self) -> Result { if let Some(o) = self.packet_recv.lock().await.recv().await { Ok(o) } else { Err(Error::Unknown) } } } pub enum RouteAlgoType { Ospf, None, } enum RouteAlgoInst { Ospf(Arc), None, } impl Clone for RouteAlgoInst { fn clone(&self) -> Self { match self { RouteAlgoInst::Ospf(route) => RouteAlgoInst::Ospf(route.clone()), RouteAlgoInst::None => RouteAlgoInst::None, } } } struct SelfTxCounters { self_tx_packets: CounterHandle, self_tx_bytes: CounterHandle, compress_tx_bytes_before: CounterHandle, compress_tx_bytes_after: CounterHandle, } pub struct PeerManager { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, nic_channel: PacketRecvChan, tasks: Mutex>, packet_recv: Arc>>, peers: Arc, peer_rpc_mgr: Arc, peer_rpc_tspt: Arc, peer_packet_process_pipeline: Arc>>, nic_packet_process_pipeline: Arc>>, route_algo_inst: RouteAlgoInst, foreign_network_manager: Arc, foreign_network_client: Arc, relay_peer_map: Arc, encryptor: Arc, data_compress_algo: CompressorAlgo, exit_nodes: RwLock>, reserved_my_peer_id_map: DashMap, recent_have_traffic: Arc>, p2p_demand_notify: Arc, allow_loopback_tunnel: AtomicBool, self_tx_counters: SelfTxCounters, traffic_metrics: Arc, peer_session_store: Arc, is_secure_mode_enabled: bool, } impl Debug for PeerManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerManager") .field("my_peer_id", &self.my_peer_id()) .field("instance_name", &self.global_ctx.inst_name) .field("net_ns", &self.global_ctx.net_ns.name()) .finish() } } impl PeerManager { // Keep lazy-p2p demand alive across the 5s task rescan interval and a full on-demand // connect attempt, without retaining extra per-task state in the hot path. const RECENT_HAVE_TRAFFIC_TTL: Duration = Duration::from_secs(30); fn should_mark_recent_traffic_for_fanout(total_dst_peers: usize) -> bool { total_dst_peers <= 1 } fn gc_recent_traffic_entries( recent_have_traffic: &DashMap, now: Instant, mut has_directly_connected_conn: F, ) where F: FnMut(PeerId) -> bool, { let mut to_remove = Vec::new(); for entry in recent_have_traffic.iter() { let peer_id = *entry.key(); let expired = now.saturating_duration_since(*entry.value()) > Self::RECENT_HAVE_TRAFFIC_TTL; if expired || has_directly_connected_conn(peer_id) { to_remove.push(peer_id); } } if !to_remove.is_empty() { for peer_id in to_remove { recent_have_traffic.remove(&peer_id); } shrink_dashmap(recent_have_traffic, None); } } pub fn new( route_algo: RouteAlgoType, global_ctx: ArcGlobalCtx, nic_channel: PacketRecvChan, ) -> Self { let my_peer_id = rand::random(); let (packet_send, packet_recv) = create_packet_recv_chan(); let peers = Arc::new(PeerMap::new( packet_send.clone(), global_ctx.clone(), my_peer_id, )); let peer_session_store = Arc::new(PeerSessionStore::new()); let encryptor = if global_ctx.get_flags().enable_encryption { // 只有在启用加密时才使用工厂函数选择算法 let algorithm = &global_ctx.get_flags().encryption_algorithm; super::encrypt::create_encryptor( algorithm, global_ctx.get_128_key(), global_ctx.get_256_key(), ) } else { // disable_encryption = true 时使用 NullCipher Arc::new(NullCipher) }; if global_ctx .check_network_in_whitelist(&global_ctx.get_network_name()) .is_err() { // if local network is not in whitelist, avoid relay data when exist any other route path global_ctx.set_avoid_relay_data_preference(true); } let is_secure_mode_enabled = global_ctx .config .get_secure_mode() .map(|cfg| cfg.enabled) .unwrap_or(false); // TODO: remove these because we have impl pipeline processor. let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); let rpc_tspt = Arc::new(RpcTransport { my_peer_id, peers: Arc::downgrade(&peers), foreign_peers: Mutex::new(None), packet_recv: Mutex::new(peer_rpc_tspt_recv), peer_rpc_tspt_sender, encryptor: encryptor.clone(), is_secure_mode_enabled, }); let peer_rpc_mgr = Arc::new(PeerRpcManager::new_with_stats_manager( rpc_tspt.clone(), global_ctx.stats_manager().clone(), )); let route_algo_inst = match route_algo { RouteAlgoType::Ospf => RouteAlgoInst::Ospf(PeerRoute::new( my_peer_id, global_ctx.clone(), peer_rpc_mgr.clone(), )), RouteAlgoType::None => RouteAlgoInst::None, }; let foreign_network_manager = Arc::new(ForeignNetworkManager::new( my_peer_id, global_ctx.clone(), peer_session_store.clone(), packet_send.clone(), Self::build_foreign_network_manager_accessor(&peers), )); let foreign_network_client = Arc::new(ForeignNetworkClient::new( global_ctx.clone(), packet_send, peer_rpc_mgr.clone(), my_peer_id, )); let data_compress_algo = global_ctx .get_flags() .data_compress_algo() .try_into() .expect("invalid data compress algo, maybe some features not enabled"); let exit_nodes = global_ctx.config.get_exit_nodes(); let stats_manager = global_ctx.stats_manager(); let network_name = global_ctx.get_network_name(); let traffic_tx_metrics = Arc::new(LogicalTrafficMetrics::new( stats_manager.clone(), network_name.clone(), MetricName::TrafficBytesTx, MetricName::TrafficPacketsTx, MetricName::TrafficBytesTxByInstance, MetricName::TrafficPacketsTxByInstance, InstanceLabelKind::To, )); let traffic_control_tx_metrics = Arc::new(LogicalTrafficMetrics::new( stats_manager.clone(), network_name.clone(), MetricName::TrafficControlBytesTx, MetricName::TrafficControlPacketsTx, MetricName::TrafficControlBytesTxByInstance, MetricName::TrafficControlPacketsTxByInstance, InstanceLabelKind::To, )); let relay_peer_map = RelayPeerMap::new( peers.clone(), Some(foreign_network_client.clone()), global_ctx.clone(), my_peer_id, peer_session_store.clone(), ); let self_tx_counters = SelfTxCounters { self_tx_packets: stats_manager.get_counter( MetricName::TrafficPacketsSelfTx, LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), self_tx_bytes: stats_manager.get_counter( MetricName::TrafficBytesSelfTx, LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), compress_tx_bytes_before: stats_manager.get_counter( MetricName::CompressionBytesTxBefore, LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), compress_tx_bytes_after: stats_manager.get_counter( MetricName::CompressionBytesTxAfter, LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), }; let traffic_rx_metrics = Arc::new(LogicalTrafficMetrics::new( stats_manager.clone(), network_name, MetricName::TrafficBytesRx, MetricName::TrafficPacketsRx, MetricName::TrafficBytesRxByInstance, MetricName::TrafficPacketsRxByInstance, InstanceLabelKind::From, )); let traffic_control_rx_metrics = Arc::new(LogicalTrafficMetrics::new( stats_manager.clone(), global_ctx.get_network_name(), MetricName::TrafficControlBytesRx, MetricName::TrafficControlPacketsRx, MetricName::TrafficControlBytesRxByInstance, MetricName::TrafficControlPacketsRxByInstance, InstanceLabelKind::From, )); let route_algo_inst_for_metrics = route_algo_inst.clone(); let traffic_metrics = Arc::new(TrafficMetricRecorder::new( my_peer_id, traffic_tx_metrics, traffic_control_tx_metrics, traffic_rx_metrics, traffic_control_rx_metrics, move |peer_id| { let route_algo_inst = route_algo_inst_for_metrics.clone(); async move { match &route_algo_inst { RouteAlgoInst::Ospf(route) => route .get_peer_info(peer_id) .await .as_ref() .and_then(route_peer_info_instance_id), RouteAlgoInst::None => None, } } }, )); PeerManager { my_peer_id, global_ctx, nic_channel, tasks: Mutex::new(JoinSet::new()), packet_recv: Arc::new(Mutex::new(Some(packet_recv))), peers, peer_rpc_mgr, peer_rpc_tspt: rpc_tspt, peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), route_algo_inst, foreign_network_manager, foreign_network_client, relay_peer_map, encryptor, data_compress_algo, exit_nodes: RwLock::new(exit_nodes), reserved_my_peer_id_map: DashMap::new(), recent_have_traffic: Arc::new(DashMap::new()), p2p_demand_notify: Arc::new(ExternalTaskSignal::new()), allow_loopback_tunnel: AtomicBool::new(true), self_tx_counters, traffic_metrics, peer_session_store, is_secure_mode_enabled, } } pub fn set_allow_loopback_tunnel(&self, allow_loopback_tunnel: bool) { self.allow_loopback_tunnel .store(allow_loopback_tunnel, std::sync::atomic::Ordering::Relaxed); } pub fn mark_recent_traffic(&self, dst_peer_id: PeerId) { if dst_peer_id == self.my_peer_id { return; } let flags = self.global_ctx.flags_arc(); if flags.disable_p2p || !flags.lazy_p2p || self.has_directly_connected_conn(dst_peer_id) { return; } let now = Instant::now(); if let Some(mut last_seen) = self.recent_have_traffic.get_mut(&dst_peer_id) { let should_notify = now.saturating_duration_since(*last_seen) > Self::RECENT_HAVE_TRAFFIC_TTL; *last_seen = now; if !should_notify { return; } } else { self.recent_have_traffic.insert(dst_peer_id, now); } self.p2p_demand_notify.notify(); } pub fn has_recent_traffic(&self, peer_id: PeerId, now: Instant) -> bool { if self.has_directly_connected_conn(peer_id) { return false; } self.recent_have_traffic .get(&peer_id) .map(|last_seen| { now.saturating_duration_since(*last_seen) <= Self::RECENT_HAVE_TRAFFIC_TTL }) .unwrap_or(false) } pub fn clear_recent_traffic(&self, peer_id: PeerId) { self.recent_have_traffic.remove(&peer_id); } pub fn p2p_demand_notify(&self) -> Arc { self.p2p_demand_notify.clone() } fn gc_recent_traffic(&self) { Self::gc_recent_traffic_entries(&self.recent_have_traffic, Instant::now(), |peer_id| { self.has_directly_connected_conn(peer_id) }); } fn build_foreign_network_manager_accessor( peer_map: &Arc, ) -> Box { struct T { peer_map: Weak, } #[async_trait::async_trait] impl GlobalForeignNetworkAccessor for T { async fn list_global_foreign_peer( &self, network_identity: &NetworkIdentity, ) -> Vec { let Some(peer_map) = self.peer_map.upgrade() else { return vec![]; }; peer_map .list_peers_own_foreign_network(network_identity) .await } } Box::new(T { peer_map: Arc::downgrade(peer_map), }) } async fn add_new_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> { let my_identity = self.global_ctx.get_network_identity(); let peer_identity = peer_conn.get_network_identity(); let conn_info = peer_conn.get_conn_info(); let local_secure_mode = self .global_ctx .config .get_secure_mode() .as_ref() .map(|cfg| cfg.enabled) .unwrap_or(false); let peer_secure_mode = !conn_info.noise_remote_static_pubkey.is_empty(); if local_secure_mode != peer_secure_mode { return Err(Error::SecretKeyError( "same-network peers must use the same secure mode".to_string(), )); } // For credential nodes, network_secret_digest is either None or all-zeros // (all-zeros when received over the wire via handshake). // In this case, only compare network_name. let my_digest_empty = my_identity .network_secret_digest .as_ref() .is_none_or(|d| d.iter().all(|b| *b == 0)); let peer_digest_empty = peer_identity .network_secret_digest .as_ref() .is_none_or(|d| d.iter().all(|b| *b == 0)); let identity_ok = if my_digest_empty || peer_digest_empty { // Credential node: only check network_name my_identity.network_name == peer_identity.network_name } else { my_identity == peer_identity }; if !identity_ok { return Err(Error::SecretKeyError( "network identity not match".to_string(), )); } let peer_id = peer_conn.get_peer_id(); self.peers.add_new_peer_conn(peer_conn).await?; self.clear_recent_traffic(peer_id); Ok(()) } pub async fn add_client_tunnel( &self, tunnel: Box, is_directly_connected: bool, ) -> Result<(PeerId, PeerConnId), Error> { self.add_client_tunnel_with_peer_id_hint(tunnel, is_directly_connected, None) .await } pub async fn add_client_tunnel_with_peer_id_hint( &self, tunnel: Box, is_directly_connected: bool, peer_id_hint: Option, ) -> Result<(PeerId, PeerConnId), Error> { let mut peer = PeerConn::new_with_peer_id_hint( self.my_peer_id, self.global_ctx.clone(), tunnel, peer_id_hint, self.peer_session_store.clone(), ); peer.set_is_hole_punched(!is_directly_connected); peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); if peer.get_network_identity().network_name == self.global_ctx.get_network_identity().network_name { self.add_new_peer_conn(peer).await?; } else { self.foreign_network_client.add_new_peer_conn(peer).await?; } Ok((peer_id, conn_id)) } pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool { if let Some(peer) = self.peers.get_peer_by_id(peer_id) { peer.has_directly_connected_conn() } else { self.foreign_network_client.get_peer_map().has_peer(peer_id) } } #[tracing::instrument] pub async fn try_direct_connect(&self, connector: C) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { self.try_direct_connect_with_peer_id_hint(connector, None) .await } #[tracing::instrument] pub async fn try_direct_connect_with_peer_id_hint( &self, mut connector: C, peer_id_hint: Option, ) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { let ns = self.global_ctx.net_ns.clone(); let t = ns .run_async(|| async move { connector.connect().await }) .await?; self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) .await } // avoid loop back to virtual network fn check_remote_addr_not_from_virtual_network( &self, tunnel: &dyn Tunnel, ) -> Result<(), anyhow::Error> { tracing::info!("check remote addr not from virtual network"); let Some(tunnel_info) = tunnel.info() else { anyhow::bail!("tunnel info is not set"); }; let Some(src) = tunnel_info.remote_addr.map(url::Url::from) else { anyhow::bail!("tunnel info remote addr is not set"); }; if src.scheme() == "ring" { return Ok(()); } let Ok(Some(addr)) = src.socket_addrs(|| Some(1)).map(|x| x.first().cloned()) else { // if the tunnel is not rely on ip address, skip check return Ok(()); }; // if no-tun is enabled, the src ip of packet in virtual network is converted to loopback address // we already filter out the connection in tcp/quic/kcp proxy so no need check here. if addr.ip().is_loopback() { // allow other loopback address, good for conn from cdn/l4 connection return Ok(()); } if self.global_ctx.is_ip_in_same_network(&addr.ip()) { anyhow::bail!( "tunnel src {} is from the same network (ignore this error please)", addr ); } Ok(()) } fn release_reserved_peer_id(&self, network_name: &str) { self.reserved_my_peer_id_map.remove(network_name); shrink_dashmap(&self.reserved_my_peer_id_map, None); } #[tracing::instrument(ret)] pub async fn add_tunnel_as_server( &self, tunnel: Box, is_directly_connected: bool, ) -> Result<(), Error> { tracing::info!("add tunnel as server start"); self.check_remote_addr_not_from_virtual_network(&tunnel)?; let mut conn = PeerConn::new( self.my_peer_id, self.global_ctx.clone(), tunnel, self.peer_session_store.clone(), ); let mut reserved_peer_id_network_name = None; let handshake_ret = conn.do_handshake_as_server_ext(|peer, network_name:&str| { if network_name == self.global_ctx.get_network_identity().network_name { return Ok(()); } let mut peer_id = self .foreign_network_manager .get_network_peer_id(network_name); if peer_id.is_none() { reserved_peer_id_network_name = Some(network_name.to_string()); peer_id = Some(*self.reserved_my_peer_id_map.entry(network_name.to_string()).or_insert_with(|| { rand::random::() }).value()); } peer.set_peer_id(peer_id.unwrap()); tracing::info!( ?peer_id, ?network_name, "handshake as server with foreign network, new peer id: {}, peer id in foreign manager: {:?}", peer.get_my_peer_id(), peer_id ); Ok(()) }) .await; if let Err(err) = handshake_ret { if let Some(network_name) = reserved_peer_id_network_name { self.release_reserved_peer_id(&network_name); } return Err(err); } let peer_identity = conn.get_network_identity(); let peer_network_name = peer_identity.network_name.clone(); let my_identity = self.global_ctx.get_network_identity(); let is_local_network = peer_network_name == my_identity.network_name; let trusted_foreign_credential = matches!(conn.get_peer_identity_type(), PeerIdentityType::Credential) && self .foreign_network_manager .is_existing_credential_pubkey_trusted( &peer_network_name, &conn.get_conn_info().noise_remote_static_pubkey, ); let foreign_network_allowed = conn.matches_local_network_secret() || trusted_foreign_credential; if !is_local_network && self.global_ctx.get_flags().private_mode && !foreign_network_allowed { self.release_reserved_peer_id(&peer_network_name); return Err(Error::SecretKeyError( "private mode is turned on, foreign network secret mismatch".to_string(), )); } conn.set_is_hole_punched(!is_directly_connected); let add_peer_ret = if is_local_network { self.add_new_peer_conn(conn).await } else { self.foreign_network_manager.add_peer_conn(conn).await }; if let Err(err) = add_peer_ret { self.release_reserved_peer_id(&peer_network_name); return Err(err); } self.release_reserved_peer_id(&peer_network_name); tracing::info!("add tunnel as server done"); Ok(()) } async fn try_handle_foreign_network_packet( mut packet: ZCPacket, my_peer_id: PeerId, peer_map: &PeerMap, foreign_network_mgr: &ForeignNetworkManager, disable_relay_data: bool, ) -> Result<(), ZCPacket> { let pm_header = packet.peer_manager_header().unwrap(); if pm_header.packet_type != PacketType::ForeignNetworkPacket as u8 { return Err(packet); } let from_peer_id = pm_header.from_peer_id.get(); let to_peer_id = pm_header.to_peer_id.get(); if disable_relay_data && Self::is_relay_data_zc_packet(&packet) { tracing::debug!( ?from_peer_id, ?to_peer_id, inner_packet_type = ?packet.foreign_network_inner_packet_type(), "drop foreign network relay data while relay data is disabled" ); return Ok(()); } let foreign_hdr = packet.foreign_network_hdr().unwrap(); let foreign_network_name = foreign_hdr.get_network_name(packet.payload()); let foreign_peer_id = foreign_hdr.get_dst_peer_id(); let foreign_network_my_peer_id = foreign_network_mgr.get_network_peer_id(&foreign_network_name); let buf_len = packet.buf_len(); let stats_manager = peer_map.get_global_ctx().stats_manager().clone(); let label_set = LabelSet::new().with_label_type(LabelType::NetworkName(foreign_network_name.clone())); let add_counter = move |bytes_metric, packets_metric| { stats_manager .get_counter(bytes_metric, label_set.clone()) .add(buf_len as u64); stats_manager.get_counter(packets_metric, label_set).inc(); }; // NOTICE: the to peer id is modified by the src from foreign network my peer id to the origin my peer id if to_peer_id == my_peer_id { // packet sent from other peer to me, extract the inner packet and forward it add_counter( MetricName::TrafficBytesForeignForwardRx, MetricName::TrafficPacketsForeignForwardRx, ); if let Err(e) = foreign_network_mgr .forward_foreign_network_packet( &foreign_network_name, foreign_peer_id, packet.foreign_network_packet(), ) .await { tracing::debug!( ?e, ?foreign_network_name, ?foreign_peer_id, "foreign network mgr send_msg_to_peer failed" ); } Ok(()) } else if Some(from_peer_id) == foreign_network_my_peer_id { // to_peer_id is my peer id for the foreign network, need to convert to the origin my_peer_id of dst let Some(to_peer_id) = peer_map .get_origin_my_peer_id(&foreign_network_name, to_peer_id) .await else { tracing::debug!( ?foreign_network_name, ?to_peer_id, "cannot find origin my peer id for foreign network." ); return Err(packet); }; add_counter( MetricName::TrafficBytesForeignForwardTx, MetricName::TrafficPacketsForeignForwardTx, ); // modify the to_peer id from foreign network my peer id to the origin my peer id packet .mut_peer_manager_header() .unwrap() .to_peer_id .set(to_peer_id); // packet is generated from foreign network mgr and should be forward to other peer if let Err(e) = peer_map .send_msg(packet, to_peer_id, NextHopPolicy::LeastHop) .await { tracing::debug!( ?e, ?to_peer_id, "send_msg_directly failed when forward local generated foreign network packet" ); } Ok(()) } else { // target is not me, forward it. try get origin peer id add_counter( MetricName::TrafficBytesForeignForwardForwarded, MetricName::TrafficPacketsForeignForwardForwarded, ); Err(packet) } } fn is_relay_data_packet(packet_type: u8) -> bool { is_relay_data_packet_type(packet_type) } fn is_relay_data_zc_packet(packet: &ZCPacket) -> bool { let Some(hdr) = packet.peer_manager_header() else { return false; }; if hdr.packet_type == PacketType::ForeignNetworkPacket as u8 { let inner_packet_type = packet.foreign_network_inner_packet_type(); if inner_packet_type.is_none() { tracing::warn!( ?hdr, "foreign network packet has unparseable inner peer manager header" ); } return inner_packet_type.is_none_or(Self::is_relay_data_packet); } Self::is_relay_data_packet(hdr.packet_type) } async fn start_peer_recv(&self) { let mut recv = self.packet_recv.lock().await.take().unwrap(); let my_peer_id = self.my_peer_id; let peers = self.peers.clone(); let pipe_line = self.peer_packet_process_pipeline.clone(); let foreign_client = self.foreign_network_client.clone(); let relay_peer_map = self.relay_peer_map.clone(); let foreign_mgr = self.foreign_network_manager.clone(); let encryptor = self.encryptor.clone(); let compress_algo = self.data_compress_algo; let acl_filter = self.global_ctx.get_acl_filter().clone(); let global_ctx = self.global_ctx.clone(); let secure_mode_enabled = self.is_secure_mode_enabled; let stats_mgr = self.global_ctx.stats_manager().clone(); let route = self.get_route(); let is_credential_node = self .global_ctx .get_network_identity() .network_secret .is_none() && secure_mode_enabled; let label_set = LabelSet::new().with_label_type(LabelType::NetworkName(global_ctx.get_network_name())); let self_tx_bytes = self.self_tx_counters.self_tx_bytes.clone(); let self_tx_packets = self.self_tx_counters.self_tx_packets.clone(); let self_rx_bytes = stats_mgr.get_counter(MetricName::TrafficBytesSelfRx, label_set.clone()); let self_rx_packets = stats_mgr.get_counter(MetricName::TrafficPacketsSelfRx, label_set.clone()); let forward_data_tx_bytes = stats_mgr.get_counter(MetricName::TrafficBytesForwarded, label_set.clone()); let forward_data_tx_packets = stats_mgr.get_counter(MetricName::TrafficPacketsForwarded, label_set.clone()); let forward_control_tx_bytes = stats_mgr.get_counter(MetricName::TrafficControlBytesForwarded, label_set.clone()); let forward_control_tx_packets = stats_mgr.get_counter( MetricName::TrafficControlPacketsForwarded, label_set.clone(), ); let compress_tx_bytes_before = self.self_tx_counters.compress_tx_bytes_before.clone(); let compress_tx_bytes_after = self.self_tx_counters.compress_tx_bytes_after.clone(); let compress_rx_bytes_before = stats_mgr.get_counter(MetricName::CompressionBytesRxBefore, label_set.clone()); let compress_rx_bytes_after = stats_mgr.get_counter(MetricName::CompressionBytesRxAfter, label_set.clone()); let traffic_metrics = self.traffic_metrics.clone(); self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); while let Ok(ret) = recv_packet_from_chan(&mut recv).await { let disable_relay_data = global_ctx.flags_arc().disable_relay_data; let Err(mut ret) = Self::try_handle_foreign_network_packet( ret, my_peer_id, &peers, &foreign_mgr, disable_relay_data, ) .await else { continue; }; let buf_len = ret.buf_len(); let is_relay_data_packet = Self::is_relay_data_zc_packet(&ret); let Some(hdr) = ret.mut_peer_manager_header() else { tracing::warn!(?ret, "invalid packet, skip"); continue; }; tracing::trace!(?hdr, "peer recv a packet..."); let from_peer_id = hdr.from_peer_id.get(); let to_peer_id = hdr.to_peer_id.get(); let packet_type = hdr.packet_type; let is_encrypted = hdr.is_encrypted(); if to_peer_id != my_peer_id { if disable_relay_data && is_relay_data_packet { tracing::debug!( ?from_peer_id, ?to_peer_id, packet_type, "drop forwarded relay data while relay data is disabled" ); continue; } if hdr.forward_counter > 7 { tracing::warn!(?hdr, "forward counter exceed, drop packet"); continue; } // Step 10b: credential nodes don't forward handshake packets if is_credential_node && (packet_type == PacketType::HandShake as u8 || packet_type == PacketType::NoiseHandshakeMsg1 as u8 || packet_type == PacketType::NoiseHandshakeMsg2 as u8 || packet_type == PacketType::NoiseHandshakeMsg3 as u8) { tracing::debug!("credential node dropping forwarded handshake packet"); continue; } if hdr.forward_counter > 2 && hdr.is_latency_first() { tracing::trace!(?hdr, "set_latency_first false because too many hop"); hdr.set_latency_first(false); } hdr.forward_counter += 1; if from_peer_id == my_peer_id { compress_tx_bytes_before.add(buf_len as u64); if packet_type == PacketType::Data as u8 || packet_type == PacketType::KcpSrc as u8 || packet_type == PacketType::KcpDst as u8 { let _ = Self::try_compress_and_encrypt( compress_algo, &encryptor, &mut ret, secure_mode_enabled, ) .await; } compress_tx_bytes_after.add(ret.buf_len() as u64); self_tx_bytes.add(ret.buf_len() as u64); self_tx_packets.inc(); } else { match traffic_kind(packet_type) { TrafficKind::Data => { forward_data_tx_bytes.add(buf_len as u64); forward_data_tx_packets.inc(); } TrafficKind::Control => { forward_control_tx_bytes.add(buf_len as u64); forward_control_tx_packets.inc(); } } } tracing::trace!(?to_peer_id, ?my_peer_id, "need forward"); let tx_metrics = if from_peer_id == my_peer_id { Some(&traffic_metrics) } else { None }; let ret = Self::send_msg_internal( &peers, &foreign_client, &relay_peer_map, tx_metrics, ret, to_peer_id, ) .await; if ret.is_err() { tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error"); } } else { if packet_type == PacketType::RelayHandshake as u8 || packet_type == PacketType::RelayHandshakeAck as u8 { let _ = relay_peer_map.handle_handshake_packet(ret).await; continue; } if !secure_mode_enabled { if let Err(e) = encryptor.decrypt(&mut ret) { tracing::error!(?e, "decrypt failed"); continue; } } else if is_encrypted { match relay_peer_map.decrypt_if_needed(&mut ret).await { Ok(true) => {} Ok(false) => { tracing::error!("secure session not found"); continue; } Err(e) => { tracing::error!(?e, "secure decrypt failed"); continue; } } } self_rx_bytes.add(buf_len as u64); self_rx_packets.inc(); traffic_metrics .record_rx(from_peer_id, packet_type, buf_len as u64) .await; compress_rx_bytes_before.add(buf_len as u64); let compressor = DefaultCompressor {}; if let Err(e) = compressor.decompress(&mut ret).await { tracing::error!(?e, "decompress failed"); continue; } compress_rx_bytes_after.add(ret.buf_len() as u64); if !acl_filter.process_packet_with_acl( &ret, true, global_ctx.get_ipv4().map(|x| x.address()), |dst| global_ctx.is_ip_local_ipv6(&dst), &route, ) { continue; } let mut processed = false; let mut zc_packet = Some(ret); tracing::trace!(?zc_packet, "try_process_packet_from_peer"); for pipeline in pipe_line.read().await.iter().rev() { zc_packet = pipeline .try_process_packet_from_peer(zc_packet.unwrap()) .await; if zc_packet.is_none() { processed = true; break; } } if !processed { tracing::error!(?zc_packet, "unhandled packet"); } } } panic!("done_peer_recv"); }); } pub async fn add_packet_process_pipeline(&self, pipeline: BoxPeerPacketFilter) { // newest pipeline will be executed first self.peer_packet_process_pipeline .write() .await .push(pipeline); } pub async fn add_nic_packet_process_pipeline(&self, pipeline: BoxNicPacketFilter) { // newest pipeline will be executed first self.nic_packet_process_pipeline .write() .await .push(pipeline); } async fn init_packet_process_pipeline(&self) { // for tun/tap ip/eth packet. struct NicPacketProcessor { nic_channel: PacketRecvChan, } #[async_trait::async_trait] impl PeerPacketFilter for NicPacketProcessor { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { let hdr = packet.peer_manager_header().unwrap(); if hdr.packet_type == PacketType::Data as u8 && !hdr.is_not_send_to_tun() { if hdr.is_encrypted() || hdr.is_compressed() { tracing::warn!( from_peer_id = hdr.from_peer_id.get(), to_peer_id = hdr.to_peer_id.get(), encrypted = hdr.is_encrypted(), compressed = hdr.is_compressed(), "dropping packet before nic because it is not fully decoded" ); return None; } tracing::trace!(?packet, "send packet to nic channel"); // TODO: use a function to get the body ref directly for zero copy let _ = self.nic_channel.send(packet).await; None } else { Some(packet) } } } self.add_packet_process_pipeline(Box::new(NicPacketProcessor { nic_channel: self.nic_channel.clone(), })) .await; // for peer rpc packet struct PeerRpcPacketProcessor { peer_rpc_tspt_sender: UnboundedSender, } #[async_trait::async_trait] impl PeerPacketFilter for PeerRpcPacketProcessor { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { let hdr = packet.peer_manager_header().unwrap(); if hdr.packet_type == PacketType::TaRpc as u8 || hdr.packet_type == PacketType::RpcReq as u8 || hdr.packet_type == PacketType::RpcResp as u8 { self.peer_rpc_tspt_sender.send(packet).unwrap(); None } else { Some(packet) } } } self.add_packet_process_pipeline(Box::new(PeerRpcPacketProcessor { peer_rpc_tspt_sender: self.peer_rpc_tspt.peer_rpc_tspt_sender.clone(), })) .await; } pub async fn add_route(&self, route: T) where T: Route + PeerPacketFilter + Send + Sync + Clone + 'static, { // for route self.add_packet_process_pipeline(Box::new(route.clone())) .await; struct Interface { my_peer_id: PeerId, peers: Weak, foreign_network_client: Weak, foreign_network_manager: Weak, } #[async_trait] impl RouteInterface for Interface { async fn list_peers(&self) -> Vec { let Some(foreign_client) = self.foreign_network_client.upgrade() else { return vec![]; }; let Some(peer_map) = self.peers.upgrade() else { return vec![]; }; let mut peers = foreign_client.list_public_peers().await; peers.extend(peer_map.list_peers_with_conn().await); peers } fn my_peer_id(&self) -> PeerId { self.my_peer_id } async fn close_peer(&self, peer_id: PeerId) { if let Some(peer_map) = self.peers.upgrade() { let _ = peer_map.close_peer(peer_id).await; } if let Some(foreign_client) = self.foreign_network_client.upgrade() { let _ = foreign_client.get_peer_map().close_peer(peer_id).await; } } async fn get_peer_public_key(&self, peer_id: PeerId) -> Option> { let peer_map = self.peers.upgrade()?; peer_map.get_peer_public_key(peer_id) } async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option { let peer_map = self.peers.upgrade()?; peer_map.get_peer_identity_type(peer_id) } async fn list_foreign_networks(&self) -> ForeignNetworkRouteInfoMap { let ret = DashMap::new(); let Some(foreign_mgr) = self.foreign_network_manager.upgrade() else { return ret; }; let networks = foreign_mgr.list_foreign_networks().await; for (network_name, info) in networks.foreign_networks.iter() { if info.peers.is_empty() { continue; } let last_update = foreign_mgr .get_foreign_network_last_update(network_name) .unwrap_or(SystemTime::now()); ret.insert( ForeignNetworkRouteInfoKey { peer_id: self.my_peer_id, network_name: network_name.clone(), }, ForeignNetworkRouteInfoEntry { foreign_peer_ids: info.peers.iter().map(|x| x.peer_id).collect(), last_update: Some(last_update.into()), version: 0, network_secret_digest: info.network_secret_digest.clone(), my_peer_id_for_this_network: info.my_peer_id_for_this_network, }, ); } ret } } let my_peer_id = self.my_peer_id; let _route_id = route .open(Box::new(Interface { my_peer_id, peers: Arc::downgrade(&self.peers), foreign_network_client: Arc::downgrade(&self.foreign_network_client), foreign_network_manager: Arc::downgrade(&self.foreign_network_manager), })) .await .unwrap(); let arc_route: ArcRoute = Arc::new(Box::new(route)); self.peers.add_route(arc_route).await; } pub fn get_route(&self) -> Box { match &self.route_algo_inst { RouteAlgoInst::Ospf(route) => Box::new(route.clone()), RouteAlgoInst::None => Box::new(MockRoute {}), } } pub async fn list_routes(&self) -> Vec { self.get_route().list_routes().await } pub async fn get_route_peer_info_last_update_time(&self) -> Instant { self.get_route().get_peer_info_last_update_time().await } pub async fn list_proxy_cidrs(&self) -> BTreeSet { self.get_route().list_proxy_cidrs().await } pub async fn list_proxy_cidrs_v6(&self) -> BTreeSet { self.get_route().list_proxy_cidrs_v6().await } pub async fn list_public_ipv6_routes(&self) -> BTreeSet { self.get_route().list_public_ipv6_routes().await } pub async fn get_my_public_ipv6_addr(&self) -> Option { self.get_route().get_my_public_ipv6_addr().await } pub async fn get_local_public_ipv6_info(&self) -> instance::ListPublicIpv6InfoResponse { self.get_route().get_local_public_ipv6_info().await } pub async fn dump_route(&self) -> String { self.get_route().dump().await } pub async fn list_global_foreign_network(&self) -> ListGlobalForeignNetworkResponse { let mut resp = ListGlobalForeignNetworkResponse::default(); let ret = self.get_route().list_foreign_network_info().await; for info in ret.infos.iter() { let entry = resp .foreign_networks .entry(info.key.as_ref().unwrap().peer_id) .or_insert_with(Default::default); let Some(route_info) = info.value.as_ref() else { continue; }; let f = OneForeignNetwork { network_name: info.key.as_ref().unwrap().network_name.clone(), peer_ids: route_info.foreign_peer_ids.clone(), last_updated: format!("{}", route_info.last_update.unwrap()), version: route_info.version, }; entry.foreign_networks.push(f); } resp } pub async fn get_foreign_network_summary(&self) -> RouteForeignNetworkSummary { self.get_route().get_foreign_network_summary().await } async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) -> bool { // Enforce ACL for outbound (NIC-originated) packets. If ACL denies, stop processing. if !self.global_ctx.get_acl_filter().process_packet_with_acl( data, false, None, |_| false, &self.get_route(), ) { return false; } for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() { let _ = pipeline.try_process_packet_from_nic(data).await; } true } pub async fn remove_nic_packet_process_pipeline(&self, id: String) -> Result<(), Error> { let mut pipelines = self.nic_packet_process_pipeline.write().await; if let Some(pos) = pipelines.iter().position(|x| x.id() == id) { pipelines.remove(pos); Ok(()) } else { Err(Error::NotFound) } } fn get_next_hop_policy(is_first_latency: bool) -> NextHopPolicy { if is_first_latency { NextHopPolicy::LeastCost } else { NextHopPolicy::LeastHop } } fn check_p2p_only_before_send(&self, dst_peer_id: PeerId) -> Result<(), Error> { if self.global_ctx.p2p_only() && !self.peers.has_peer(dst_peer_id) { return Err(Error::RouteError(None)); } Ok(()) } pub async fn send_msg_for_proxy( &self, mut msg: ZCPacket, dst_peer_id: PeerId, ) -> Result<(), Error> { self.mark_recent_traffic(dst_peer_id); self.check_p2p_only_before_send(dst_peer_id)?; self.self_tx_counters .compress_tx_bytes_before .add(msg.buf_len() as u64); Self::try_compress_and_encrypt( self.data_compress_algo, &self.encryptor, &mut msg, self.is_secure_mode_enabled, ) .await?; self.self_tx_counters .compress_tx_bytes_after .add(msg.buf_len() as u64); let msg_len = msg.buf_len() as u64; let result = Self::send_msg_internal( &self.peers, &self.foreign_network_client, &self.relay_peer_map, Some(&self.traffic_metrics), msg, dst_peer_id, ) .await; if result.is_ok() { self.self_tx_counters.self_tx_bytes.add(msg_len); self.self_tx_counters.self_tx_packets.inc(); } result } async fn send_msg_internal( peers: &Arc, foreign_network_client: &Arc, relay_peer_map: &Arc, direct_tx_metrics: Option<&Arc>, msg: ZCPacket, dst_peer_id: PeerId, ) -> Result<(), Error> { let policy = Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first()); let packet_type = msg.peer_manager_header().unwrap().packet_type; let msg_len = msg.buf_len() as u64; let send_result = if peers.has_peer(dst_peer_id) { peers.send_msg_directly(msg, dst_peer_id).await } else if foreign_network_client.has_next_hop(dst_peer_id) { foreign_network_client.send_msg(msg, dst_peer_id).await } else if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await { if peers.has_peer(gateway) || foreign_network_client.has_next_hop(gateway) { relay_peer_map.send_msg(msg, dst_peer_id, policy).await } else { tracing::warn!( ?gateway, ?dst_peer_id, "cannot send msg to peer through gateway" ); Err(Error::RouteError(None)) } } else if foreign_network_client.has_next_hop(dst_peer_id) { // check foreign network again. so in happy path we can avoid extra check foreign_network_client.send_msg(msg, dst_peer_id).await } else { tracing::debug!(?dst_peer_id, "no gateway for peer"); Err(Error::RouteError(None)) }; if send_result.is_ok() && let Some(metrics) = direct_tx_metrics { metrics.record_tx(dst_peer_id, packet_type, msg_len).await; } send_result } pub async fn get_msg_dst_peer(&self, addr: &IpAddr) -> (Vec, bool) { match addr { IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(ipv4_addr).await, IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(ipv6_addr).await, } } fn is_all_peers_broadcast_ipv4(&self, ipv4_addr: &Ipv4Addr) -> bool { let network_length = self .global_ctx .get_ipv4() .map(|x| x.network_length()) .unwrap_or(24); let ipv4_inet = cidr::Ipv4Inet::new(*ipv4_addr, network_length).unwrap(); ipv4_addr.is_broadcast() || ipv4_addr.is_multicast() || *ipv4_addr == ipv4_inet.last_address() } fn is_all_peers_broadcast_ipv6(&self, ipv6_addr: &Ipv6Addr) -> bool { let network_length = self .global_ctx .get_ipv6() .map(|x| x.network_length()) .unwrap_or(64); let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap(); ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() } pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec, bool) { let mut is_exit_node = false; let mut dst_peers = vec![]; if self.is_all_peers_broadcast_ipv4(ipv4_addr) { dst_peers.extend(self.peers.list_routes().await.iter().filter_map(|x| { if *x.key() != self.my_peer_id { Some(*x.key()) } else { None } })); } else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(ipv4_addr).await { dst_peers.push(peer_id); } else if !self .global_ctx .is_ip_in_same_network(&std::net::IpAddr::V4(*ipv4_addr)) { for exit_node in self.exit_nodes.read().await.iter() { let IpAddr::V4(exit_node) = exit_node else { continue; }; if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(exit_node).await { dst_peers.push(peer_id); is_exit_node = true; break; } } } #[cfg(target_env = "ohos")] { if dst_peers.is_empty() && !self .global_ctx .is_ip_in_same_network(&std::net::IpAddr::V4(*ipv4_addr)) { tracing::trace!("no peer id for ipv4: {}, set exit_node for ohos", ipv4_addr); dst_peers.push(self.my_peer_id.clone()); is_exit_node = true; } } (dst_peers, is_exit_node) } pub async fn get_msg_dst_peer_ipv6(&self, ipv6_addr: &Ipv6Addr) -> (Vec, bool) { let mut is_exit_node = false; let mut dst_peers = vec![]; if self.is_all_peers_broadcast_ipv6(ipv6_addr) { dst_peers.extend(self.peers.list_routes().await.iter().map(|x| *x.key())); } else if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(ipv6_addr).await { dst_peers.push(peer_id); } else if !ipv6_addr.is_unicast_link_local() && let Some(peer_id) = self.get_route().get_public_ipv6_gateway_peer_id().await { dst_peers.push(peer_id); } else if !ipv6_addr.is_unicast_link_local() { // NOTE: never route link local address to exit node. for exit_node in self.exit_nodes.read().await.iter() { let IpAddr::V6(exit_node) = exit_node else { continue; }; if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(exit_node).await { dst_peers.push(peer_id); is_exit_node = true; break; } } } (dst_peers, is_exit_node) } pub async fn try_compress_and_encrypt( compress_algo: CompressorAlgo, encryptor: &Arc, msg: &mut ZCPacket, secure_mode_enabled: bool, ) -> Result<(), Error> { let compressor = DefaultCompressor {}; compressor .compress(msg, compress_algo) .await .with_context(|| "compress failed")?; if !secure_mode_enabled { encryptor.encrypt(msg).with_context(|| "encrypt failed")?; } Ok(()) } pub async fn send_msg_by_ip( &self, mut msg: ZCPacket, ip_addr: IpAddr, not_send_to_self: bool, ) -> Result<(), Error> { tracing::trace!( "do send_msg in peer manager, msg: {:?}, ip_addr: {}", msg, ip_addr ); msg.fill_peer_manager_hdr( self.my_peer_id, 0, tunnel::packet_def::PacketType::Data as u8, ); if !self.run_nic_packet_process_pipeline(&mut msg).await { return Ok(()); } let cur_to_peer_id = msg.peer_manager_header().unwrap().to_peer_id.into(); if cur_to_peer_id != 0 { self.mark_recent_traffic(cur_to_peer_id); return Self::send_msg_internal( &self.peers, &self.foreign_network_client, &self.relay_peer_map, Some(&self.traffic_metrics), msg, cur_to_peer_id, ) .await; } let (dst_peers, is_exit_node) = match ip_addr { IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(&ipv4_addr).await, IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(&ipv6_addr).await, }; if dst_peers.is_empty() { tracing::info!("no peer id for ip: {}", ip_addr); return Ok(()); } self.self_tx_counters .compress_tx_bytes_before .add(msg.buf_len() as u64); Self::try_compress_and_encrypt( self.data_compress_algo, &self.encryptor, &mut msg, self.is_secure_mode_enabled, ) .await?; self.self_tx_counters .compress_tx_bytes_after .add(msg.buf_len() as u64); let is_latency_first = self.global_ctx.latency_first(); msg.mut_peer_manager_header() .unwrap() .set_latency_first(is_latency_first) .set_exit_node(is_exit_node); let mut errs: Vec = vec![]; let mut msg = Some(msg); let total_dst_peers = dst_peers.len(); let should_mark_recent_traffic = Self::should_mark_recent_traffic_for_fanout(total_dst_peers); for (i, peer_id) in dst_peers.iter().enumerate() { if should_mark_recent_traffic { self.mark_recent_traffic(*peer_id); } if let Err(e) = self.check_p2p_only_before_send(*peer_id) { errs.push(e); continue; } let mut msg = if i == total_dst_peers - 1 { msg.take().unwrap() } else { msg.clone().unwrap() }; let hdr = msg.mut_peer_manager_header().unwrap(); hdr.to_peer_id.set(*peer_id); #[cfg(not(target_env = "ohos"))] { if not_send_to_self && *peer_id == self.my_peer_id && !self.global_ctx.is_ip_local_virtual_ip(&ip_addr) { // Keep the loop-prevention flags for proxy-induced self-delivery where // the destination is not this node's own EasyTier-managed IP. hdr.set_not_send_to_tun(true); hdr.set_no_proxy(true); } } self.self_tx_counters .self_tx_bytes .add(msg.buf_len() as u64); self.self_tx_counters.self_tx_packets.inc(); if let Err(e) = Self::send_msg_internal( &self.peers, &self.foreign_network_client, &self.relay_peer_map, Some(&self.traffic_metrics), msg, *peer_id, ) .await { errs.push(e); } } tracing::trace!(?dst_peers, "do send_msg in peer manager done"); if errs.is_empty() { Ok(()) } else { tracing::error!(?errs, "send_msg has error"); Err(anyhow::anyhow!("send_msg has error: {:?}", errs).into()) } } async fn run_clean_peer_without_conn_routine(&self) { let peer_map = self.peers.clone(); self.tasks.lock().await.spawn(async move { loop { peer_map.clean_peer_without_conn().await; tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }); } async fn run_relay_session_gc_routine(&self) { let relay_peer_map = self.relay_peer_map.clone(); self.tasks.lock().await.spawn(async move { loop { relay_peer_map.evict_idle_sessions(std::time::Duration::from_secs(60)); tokio::time::sleep(std::time::Duration::from_secs(30)).await; } }); } async fn run_recent_traffic_gc_routine(&self) { let recent_have_traffic = self.recent_have_traffic.clone(); let peers = self.peers.clone(); let foreign_network_client = self.foreign_network_client.clone(); self.tasks.lock().await.spawn(async move { loop { PeerManager::gc_recent_traffic_entries( recent_have_traffic.as_ref(), Instant::now(), |peer_id| { if let Some(peer) = peers.get_peer_by_id(peer_id) { peer.has_directly_connected_conn() } else { foreign_network_client.get_peer_map().has_peer(peer_id) } }, ); tokio::time::sleep(std::time::Duration::from_secs(30)).await; } }); } async fn run_peer_session_gc_routine(&self) { let peer_session_store = self.peer_session_store.clone(); self.tasks.lock().await.spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_secs(60)).await; peer_session_store.evict_unused_sessions(); } }); } async fn run_traffic_metrics_gc_routine(&self) { let mut event_receiver = self.global_ctx.subscribe(); let traffic_metrics = self.traffic_metrics.clone(); self.tasks.lock().await.spawn(async move { loop { match event_receiver.recv().await { Ok(GlobalCtxEvent::PeerRemoved(peer_id)) => { traffic_metrics.remove_peer(peer_id); } Ok(_) => {} Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { tracing::warn!( skipped, "traffic metrics GC receiver lagged; clearing peer cache to avoid stale metric attribution" ); traffic_metrics.clear_peer_cache(); event_receiver = event_receiver.resubscribe(); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); } async fn run_foriegn_network(&self) { self.peer_rpc_tspt .foreign_peers .lock() .await .replace(Arc::downgrade(&self.foreign_network_client)); self.foreign_network_client.run().await; } pub async fn run(&self) -> Result<(), Error> { match &self.route_algo_inst { RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await, RouteAlgoInst::None => {} }; self.init_packet_process_pipeline().await; self.peer_rpc_mgr.run(); self.start_peer_recv().await; self.run_clean_peer_without_conn_routine().await; self.run_relay_session_gc_routine().await; self.run_recent_traffic_gc_routine().await; self.run_peer_session_gc_routine().await; self.run_traffic_metrics_gc_routine().await; self.run_foriegn_network().await; Ok(()) } pub fn get_peer_map(&self) -> Arc { self.peers.clone() } pub fn get_relay_peer_map(&self) -> Arc { self.relay_peer_map.clone() } pub fn get_peer_rpc_mgr(&self) -> Arc { self.peer_rpc_mgr.clone() } pub fn get_peer_session_store(&self) -> Arc { self.peer_session_store.clone() } pub fn my_node_id(&self) -> uuid::Uuid { self.global_ctx.get_id() } pub fn my_peer_id(&self) -> PeerId { self.my_peer_id } pub fn get_global_ctx(&self) -> ArcGlobalCtx { self.global_ctx.clone() } pub fn get_global_ctx_ref(&self) -> &ArcGlobalCtx { &self.global_ctx } pub fn get_nic_channel(&self) -> PacketRecvChan { self.nic_channel.clone() } pub fn get_foreign_network_manager(&self) -> Arc { self.foreign_network_manager.clone() } pub fn get_foreign_network_client(&self) -> Arc { self.foreign_network_client.clone() } pub async fn get_my_info(&self) -> instance::NodeInfo { instance::NodeInfo { peer_id: self.my_peer_id, ipv4_addr: self .global_ctx .get_ipv4() .map(|x| x.to_string()) .unwrap_or_default(), proxy_cidrs: self .global_ctx .config .get_proxy_cidrs() .into_iter() .map(|x| match x.mapped_cidr { None => x.cidr.to_string(), Some(mapped) => format!("{}->{}", x.cidr, mapped), }) .collect(), hostname: self.global_ctx.get_hostname(), stun_info: Some(self.global_ctx.get_stun_info_collector().get_stun_info()), inst_id: self.global_ctx.get_id().to_string(), listeners: self .global_ctx .get_running_listeners() .iter() .map(|x| x.to_string()) .collect(), config: self.global_ctx.config.dump(), version: EASYTIER_VERSION.to_string(), feature_flag: Some(self.global_ctx.get_feature_flags()), ip_list: Some(self.global_ctx.get_ip_collector().collect_ip_addrs().await), public_ipv6_addr: self.get_my_public_ipv6_addr().await.map(Into::into), ipv6_public_addr_prefix: self .global_ctx .get_advertised_ipv6_public_addr_prefix() .map(|prefix| { cidr::Ipv6Inet::new(prefix.first_address(), prefix.network_length()) .unwrap() .into() }), } } pub async fn wait(&self) { while !self.tasks.lock().await.is_empty() { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } pub async fn clear_resources(&self) { let mut peer_pipeline = self.peer_packet_process_pipeline.write().await; peer_pipeline.clear(); let mut nic_pipeline = self.nic_packet_process_pipeline.write().await; nic_pipeline.clear(); self.peer_rpc_mgr.rpc_server().registry().unregister_all(); } pub async fn close_peer_conn( &self, peer_id: PeerId, conn_id: &PeerConnId, ) -> Result<(), Error> { let ret = self.peers.close_peer_conn(peer_id, conn_id).await; tracing::info!("close_peer_conn in peer map: {:?}", ret); if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) { return ret; } let ret = self .foreign_network_client .get_peer_map() .close_peer_conn(peer_id, conn_id) .await; tracing::info!("close_peer_conn in foreign network client: {:?}", ret); if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) { return ret; } let ret = self .foreign_network_manager .close_peer_conn(peer_id, conn_id) .await; tracing::info!("close_peer_conn in foreign network manager done: {:?}", ret); ret } pub async fn check_allow_kcp_to_dst(&self, dst_ip: &IpAddr) -> bool { let route = self.get_route(); let Some(dst_peer_id) = route.get_peer_id_by_ip(dst_ip).await else { return false; }; let Some(peer_info) = route.get_peer_info(dst_peer_id).await else { return false; }; // check dst allow kcp input if !peer_info.feature_flag.map(|x| x.kcp_input).unwrap_or(false) { return false; } let next_hop_policy = Self::get_next_hop_policy(self.global_ctx.get_flags().latency_first); // check relay node allow relay kcp. let Some(next_hop_id) = route .get_next_hop_with_policy(dst_peer_id, next_hop_policy) .await else { return false; }; if next_hop_id == dst_peer_id { // dst p2p, no need to relay return true; } let Some(next_hop_info) = route.get_peer_info(next_hop_id).await else { return false; }; // check next hop allow kcp relay if next_hop_info .feature_flag .map(|x| x.no_relay_kcp) .unwrap_or(false) { return false; } true } pub async fn check_allow_quic_to_dst(&self, dst_ip: &IpAddr) -> bool { let route = self.get_route(); let Some(dst_peer_id) = route.get_peer_id_by_ip(dst_ip).await else { return false; }; let Some(peer_info) = route.get_peer_info(dst_peer_id).await else { return false; }; // check dst allow quic input if !peer_info .feature_flag .map(|x| x.quic_input) .unwrap_or(false) { return false; } let next_hop_policy = Self::get_next_hop_policy(self.global_ctx.get_flags().latency_first); // check relay node allow relay quic. let Some(next_hop_id) = route .get_next_hop_with_policy(dst_peer_id, next_hop_policy) .await else { return false; }; if next_hop_id == dst_peer_id { // dst p2p, no need to relay return true; } let Some(next_hop_info) = route.get_peer_info(next_hop_id).await else { return false; }; // check next hop allow quic relay if next_hop_info .feature_flag .map(|x| x.no_relay_quic) .unwrap_or(false) { return false; } true } pub async fn update_exit_nodes(&self) { let exit_nodes = self.global_ctx.config.get_exit_nodes(); *self.exit_nodes.write().await = exit_nodes; } } #[cfg(test)] mod tests { use std::{ fmt::Debug, sync::Arc, time::{Duration, Instant}, }; use crate::{ common::{ config::Flags, global_ctx::{NetworkIdentity, tests::get_mock_global_ctx}, stats_manager::{LabelSet, LabelType, MetricName}, }, connector::{ create_connector_by_url, direct::PeerManagerForDirectConnector, udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun, }, instance::listeners::create_listener_by_url, peers::{ create_packet_recv_chan, peer_conn::tests::set_secure_mode_cfg, peer_manager::RouteAlgoType, peer_rpc::tests::register_service, route_trait::NextHopPolicy, tests::{ connect_peer_manager, create_mock_peer_manager_with_name, wait_route_appear, wait_route_appear_with_cost, }, }, proto::{ common::{CompressionAlgoPb, NatType}, peer_rpc::SecureAuthLevel, }, tunnel::{ TunnelConnector, TunnelListener, common::tests::wait_for_condition, filter::{TunnelWithFilter, tests::DropSendTunnelFilter}, packet_def::{PacketType, ZCPacket}, ring::create_ring_tunnel_pair, }, }; use super::PeerManager; async fn create_lazy_peer_manager() -> Arc { let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let mut flags = peer_mgr.get_global_ctx().get_flags(); flags.lazy_p2p = true; peer_mgr.get_global_ctx().set_flags(flags); peer_mgr } fn metric_value(peer_mgr: &PeerManager, metric: MetricName, labels: &LabelSet) -> u64 { peer_mgr .get_global_ctx() .stats_manager() .get_metric(metric, labels) .map(|metric| metric.value) .unwrap_or(0) } fn network_labels(peer_mgr: &PeerManager) -> LabelSet { LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr.get_global_ctx().get_network_name(), )) } #[test] fn recent_traffic_fanout_policy_only_marks_single_peer() { assert!(PeerManager::should_mark_recent_traffic_for_fanout(0)); assert!(PeerManager::should_mark_recent_traffic_for_fanout(1)); assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2)); } #[test] fn gc_recent_traffic_removes_expired_and_connected_entries() { let stale_peer = 1; let direct_peer = 2; let active_peer = 3; let recent_have_traffic = dashmap::DashMap::new(); recent_have_traffic.insert( stale_peer, Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1), ); recent_have_traffic.insert(direct_peer, Instant::now()); recent_have_traffic.insert(active_peer, Instant::now()); let future_peer = 4; recent_have_traffic.insert(future_peer, Instant::now() + Duration::from_secs(1)); PeerManager::gc_recent_traffic_entries(&recent_have_traffic, Instant::now(), |peer_id| { peer_id == direct_peer }); assert!(!recent_have_traffic.contains_key(&stale_peer)); assert!(!recent_have_traffic.contains_key(&direct_peer)); assert!(recent_have_traffic.contains_key(&active_peer)); assert!(recent_have_traffic.contains_key(&future_peer)); } #[tokio::test] async fn recent_traffic_skips_direct_peers_and_clears_after_direct_connect() { let peer_mgr_a = create_lazy_peer_manager().await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_b_id = peer_mgr_b.my_peer_id(); peer_mgr_a.mark_recent_traffic(peer_b_id); assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now())); let (a_ring, b_ring) = create_ring_tunnel_pair(); let (client_ret, server_ret) = tokio::join!( peer_mgr_a.add_client_tunnel(a_ring, true), peer_mgr_b.add_tunnel_as_server(b_ring, true) ); client_ret.unwrap(); server_ret.unwrap(); wait_for_condition( || { let peer_mgr_a = peer_mgr_a.clone(); async move { peer_mgr_a.has_directly_connected_conn(peer_b_id) } }, Duration::from_secs(5), ) .await; wait_for_condition( || { let peer_mgr_a = peer_mgr_a.clone(); async move { !peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()) } }, Duration::from_secs(5), ) .await; peer_mgr_a.mark_recent_traffic(peer_b_id); assert!( !peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now()), "directly connected peers should not be tracked as lazy-p2p demand" ); } #[tokio::test] async fn recent_traffic_notifies_only_when_demand_becomes_active() { let peer_mgr_a = create_lazy_peer_manager().await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_b_id = peer_mgr_b.my_peer_id(); let signal = peer_mgr_a.p2p_demand_notify(); let initial_version = signal.version(); peer_mgr_a.mark_recent_traffic(peer_b_id); assert_eq!(signal.version(), initial_version + 1); let first_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap(); tokio::time::sleep(Duration::from_millis(5)).await; peer_mgr_a.mark_recent_traffic(peer_b_id); assert_eq!( signal.version(), initial_version + 1, "fresh demand should not wake all p2p workers again" ); let refreshed_seen = *peer_mgr_a.recent_have_traffic.get(&peer_b_id).unwrap(); assert!(refreshed_seen > first_seen); if let Some(mut last_seen) = peer_mgr_a.recent_have_traffic.get_mut(&peer_b_id) { *last_seen = Instant::now() - PeerManager::RECENT_HAVE_TRAFFIC_TTL - Duration::from_millis(1); } peer_mgr_a.mark_recent_traffic(peer_b_id); assert_eq!(signal.version(), initial_version + 2); } #[test] fn disable_relay_data_classifies_data_plane_packets_only() { for packet_type in [ PacketType::Data, PacketType::KcpSrc, PacketType::KcpDst, PacketType::QuicSrc, PacketType::QuicDst, PacketType::DataWithKcpSrcModified, PacketType::DataWithQuicSrcModified, PacketType::ForeignNetworkPacket, ] { assert!(PeerManager::is_relay_data_packet(packet_type as u8)); } for packet_type in [ PacketType::RpcReq, PacketType::RpcResp, PacketType::Ping, PacketType::Pong, PacketType::HandShake, PacketType::NoiseHandshakeMsg1, PacketType::NoiseHandshakeMsg2, PacketType::NoiseHandshakeMsg3, PacketType::RelayHandshake, PacketType::RelayHandshakeAck, ] { assert!(!PeerManager::is_relay_data_packet(packet_type as u8)); } } #[test] fn disable_relay_data_inspects_foreign_network_inner_packet_type() { let network_name = "net1".to_string(); let mut rpc_packet = ZCPacket::new_with_payload(b"rpc"); rpc_packet.fill_peer_manager_hdr(1, 2, PacketType::RpcReq as u8); let mut foreign_rpc_packet = ZCPacket::new_for_foreign_network(&network_name, 2, &rpc_packet); foreign_rpc_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8); assert_eq!( foreign_rpc_packet.foreign_network_inner_packet_type(), Some(PacketType::RpcReq as u8) ); assert!(!PeerManager::is_relay_data_zc_packet(&foreign_rpc_packet)); let mut data_packet = ZCPacket::new_with_payload(b"data"); data_packet.fill_peer_manager_hdr(1, 2, PacketType::Data as u8); let mut foreign_data_packet = ZCPacket::new_for_foreign_network(&network_name, 2, &data_packet); foreign_data_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8); assert_eq!( foreign_data_packet.foreign_network_inner_packet_type(), Some(PacketType::Data as u8) ); assert!(PeerManager::is_relay_data_zc_packet(&foreign_data_packet)); } #[tokio::test] async fn non_whitelisted_network_avoid_relay_survives_disable_relay_data_toggle() { let global_ctx = get_mock_global_ctx(); let mut flags = global_ctx.get_flags(); flags.disable_relay_data = true; flags.relay_network_whitelist = "other-network".to_string(); global_ctx.set_flags(flags); let (packet_send, _packet_recv) = create_packet_recv_chan(); let _peer_mgr = PeerManager::new(RouteAlgoType::Ospf, global_ctx.clone(), packet_send); let mut flags = global_ctx.get_flags(); flags.disable_relay_data = false; global_ctx.set_flags(flags); assert!(global_ctx.get_feature_flags().avoid_relay_data); } #[tokio::test] async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() { let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let dst_peer_id = peer_mgr.my_peer_id().wrapping_add(1); let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr.get_global_ctx().get_network_name(), )); let mut pkt = ZCPacket::new_with_payload(b"tx"); pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8); let result = PeerManager::send_msg_internal( &peer_mgr.peers, &peer_mgr.foreign_network_client, &peer_mgr.relay_peer_map, Some(&peer_mgr.traffic_metrics), pkt, dst_peer_id, ) .await; assert!(result.is_err()); assert_eq!( peer_mgr .get_global_ctx() .stats_manager() .get_metric(MetricName::TrafficBytesTx, &network_labels) .unwrap() .value, 0 ); assert_eq!( peer_mgr .get_global_ctx() .stats_manager() .get_metric(MetricName::TrafficPacketsTx, &network_labels) .unwrap() .value, 0 ); assert!( peer_mgr .get_global_ctx() .stats_manager() .get_metric( MetricName::TrafficBytesTxByInstance, &network_labels .clone() .with_label_type(LabelType::ToInstanceId("unknown".to_string())), ) .is_none() ); assert!( peer_mgr .get_global_ctx() .stats_manager() .get_metric( MetricName::TrafficPacketsTxByInstance, &network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())), ) .is_none() ); } #[tokio::test] async fn send_msg_internal_does_not_record_tx_metrics_for_self_loop() { let (s, _r) = create_packet_recv_chan(); let peer_mgr = Arc::new(PeerManager::new( RouteAlgoType::None, get_mock_global_ctx(), s, )); let dst_peer_id = peer_mgr.my_peer_id(); let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr.get_global_ctx().get_network_name(), )); let mut pkt = ZCPacket::new_with_payload(b"tx"); pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8); PeerManager::send_msg_internal( &peer_mgr.peers, &peer_mgr.foreign_network_client, &peer_mgr.relay_peer_map, Some(&peer_mgr.traffic_metrics), pkt, dst_peer_id, ) .await .unwrap(); assert_eq!( metric_value(&peer_mgr, MetricName::TrafficBytesTx, &network_labels), 0 ); assert_eq!( metric_value(&peer_mgr, MetricName::TrafficPacketsTx, &network_labels), 0 ); assert_eq!( metric_value( &peer_mgr, MetricName::TrafficControlBytesTx, &network_labels ), 0 ); assert_eq!( metric_value( &peer_mgr, MetricName::TrafficControlPacketsTx, &network_labels ), 0 ); assert!( peer_mgr .get_global_ctx() .stats_manager() .get_metric( MetricName::TrafficBytesTxByInstance, &network_labels .clone() .with_label_type(LabelType::ToInstanceId("unknown".to_string())), ) .is_none() ); assert!( peer_mgr .get_global_ctx() .stats_manager() .get_metric( MetricName::TrafficControlBytesTxByInstance, &network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())), ) .is_none() ); } #[tokio::test] async fn send_msg_internal_records_data_metrics_for_direct_peer() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) .await .unwrap(); let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr_a.get_global_ctx().get_network_name(), )); let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr_b.get_global_ctx().get_network_name(), )); let a_data_tx_before = metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels); let b_data_rx_before = metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels); let mut pkt = ZCPacket::new_with_payload(b"data"); pkt.fill_peer_manager_hdr( peer_mgr_a.my_peer_id(), peer_mgr_b.my_peer_id(), PacketType::Data as u8, ); let pkt_len = pkt.buf_len() as u64; PeerManager::send_msg_internal( &peer_mgr_a.peers, &peer_mgr_a.foreign_network_client, &peer_mgr_a.relay_peer_map, Some(&peer_mgr_a.traffic_metrics), pkt, peer_mgr_b.my_peer_id(), ) .await .unwrap(); wait_for_condition( || { let peer_mgr_a = peer_mgr_a.clone(); let peer_mgr_b = peer_mgr_b.clone(); let a_network_labels = a_network_labels.clone(); let b_network_labels = b_network_labels.clone(); async move { metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels) >= a_data_tx_before + pkt_len && metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels) >= b_data_rx_before + pkt_len } }, Duration::from_secs(5), ) .await; } #[tokio::test] async fn send_msg_internal_records_control_metrics_for_direct_peer() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) .await .unwrap(); let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr_a.get_global_ctx().get_network_name(), )); let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( peer_mgr_b.get_global_ctx().get_network_name(), )); let a_control_tx_before = metric_value( &peer_mgr_a, MetricName::TrafficControlBytesTx, &a_network_labels, ); let b_control_rx_before = metric_value( &peer_mgr_b, MetricName::TrafficControlBytesRx, &b_network_labels, ); let a_data_tx_before = metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels); let b_data_rx_before = metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels); let mut pkt = ZCPacket::new_with_payload(b"ctrl"); pkt.fill_peer_manager_hdr( peer_mgr_a.my_peer_id(), peer_mgr_b.my_peer_id(), PacketType::RpcReq as u8, ); let pkt_len = pkt.buf_len() as u64; PeerManager::send_msg_internal( &peer_mgr_a.peers, &peer_mgr_a.foreign_network_client, &peer_mgr_a.relay_peer_map, Some(&peer_mgr_a.traffic_metrics), pkt, peer_mgr_b.my_peer_id(), ) .await .unwrap(); wait_for_condition( || { let peer_mgr_a = peer_mgr_a.clone(); let peer_mgr_b = peer_mgr_b.clone(); let a_network_labels = a_network_labels.clone(); let b_network_labels = b_network_labels.clone(); async move { metric_value( &peer_mgr_a, MetricName::TrafficControlBytesTx, &a_network_labels, ) >= a_control_tx_before + pkt_len && metric_value( &peer_mgr_b, MetricName::TrafficControlBytesRx, &b_network_labels, ) >= b_control_rx_before + pkt_len } }, Duration::from_secs(5), ) .await; assert_eq!( metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels), a_data_tx_before ); assert_eq!( metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels), b_data_rx_before ); } #[tokio::test] async fn send_msg_internal_records_data_forwarded_metrics_for_transit_peer() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone()) .await .unwrap(); let b_network_labels = network_labels(&peer_mgr_b); let forwarded_bytes_before = metric_value( &peer_mgr_b, MetricName::TrafficBytesForwarded, &b_network_labels, ); let forwarded_packets_before = metric_value( &peer_mgr_b, MetricName::TrafficPacketsForwarded, &b_network_labels, ); let mut pkt = ZCPacket::new_with_payload(b"forward-data"); pkt.fill_peer_manager_hdr( peer_mgr_a.my_peer_id(), peer_mgr_c.my_peer_id(), PacketType::Data as u8, ); let pkt_len = pkt.buf_len() as u64; PeerManager::send_msg_internal( &peer_mgr_a.peers, &peer_mgr_a.foreign_network_client, &peer_mgr_a.relay_peer_map, Some(&peer_mgr_a.traffic_metrics), pkt, peer_mgr_c.my_peer_id(), ) .await .unwrap(); wait_for_condition( || { let peer_mgr_b = peer_mgr_b.clone(); let b_network_labels = b_network_labels.clone(); async move { metric_value( &peer_mgr_b, MetricName::TrafficBytesForwarded, &b_network_labels, ) >= forwarded_bytes_before + pkt_len && metric_value( &peer_mgr_b, MetricName::TrafficPacketsForwarded, &b_network_labels, ) > forwarded_packets_before } }, Duration::from_secs(5), ) .await; } #[tokio::test] async fn send_msg_internal_records_control_forwarded_metrics_for_transit_peer() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone()) .await .unwrap(); let b_network_labels = network_labels(&peer_mgr_b); let forwarded_bytes_before = metric_value( &peer_mgr_b, MetricName::TrafficControlBytesForwarded, &b_network_labels, ); let forwarded_packets_before = metric_value( &peer_mgr_b, MetricName::TrafficControlPacketsForwarded, &b_network_labels, ); let mut pkt = ZCPacket::new_with_payload(b"forward-control"); pkt.fill_peer_manager_hdr( peer_mgr_a.my_peer_id(), peer_mgr_c.my_peer_id(), PacketType::RpcReq as u8, ); let pkt_len = pkt.buf_len() as u64; PeerManager::send_msg_internal( &peer_mgr_a.peers, &peer_mgr_a.foreign_network_client, &peer_mgr_a.relay_peer_map, Some(&peer_mgr_a.traffic_metrics), pkt, peer_mgr_c.my_peer_id(), ) .await .unwrap(); wait_for_condition( || { let peer_mgr_b = peer_mgr_b.clone(); let b_network_labels = b_network_labels.clone(); async move { metric_value( &peer_mgr_b, MetricName::TrafficControlBytesForwarded, &b_network_labels, ) >= forwarded_bytes_before + pkt_len && metric_value( &peer_mgr_b, MetricName::TrafficControlPacketsForwarded, &b_network_labels, ) > forwarded_packets_before } }, Duration::from_secs(5), ) .await; } #[tokio::test] async fn recent_traffic_tolerates_future_timestamps() { let peer_mgr_a = create_lazy_peer_manager().await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_b_id = peer_mgr_b.my_peer_id(); peer_mgr_a .recent_have_traffic .insert(peer_b_id, Instant::now() + Duration::from_secs(1)); assert!(peer_mgr_a.has_recent_traffic(peer_b_id, Instant::now())); peer_mgr_a.mark_recent_traffic(peer_b_id); } #[tokio::test] async fn drop_peer_manager() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_c.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) .await .unwrap(); wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone()) .await .unwrap(); // wait mgr_a have 2 peers wait_for_condition( || async { peer_mgr_a.get_peer_map().list_peers_with_conn().await.len() == 2 }, std::time::Duration::from_secs(5), ) .await; drop(peer_mgr_b); wait_for_condition( || async { peer_mgr_a.get_peer_map().list_peers_with_conn().await.len() == 1 }, std::time::Duration::from_secs(5), ) .await; } #[tokio::test] async fn peer_manager_safe_mode_connect_between_peers() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; peer_mgr_a .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); peer_mgr_b .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); set_secure_mode_cfg(&peer_mgr_a.get_global_ctx(), true); set_secure_mode_cfg(&peer_mgr_b.get_global_ctx(), true); let (a_ring, b_ring) = create_ring_tunnel_pair(); let (a_ret, b_ret) = tokio::join!( peer_mgr_a.add_client_tunnel(a_ring, false), peer_mgr_b.add_tunnel_as_server(b_ring, true) ); let (peer_b_id, _) = a_ret.unwrap(); b_ret.unwrap(); wait_for_condition( || { let peer_mgr_a = peer_mgr_a.clone(); async move { if !peer_mgr_a .get_peer_map() .list_peers_with_conn() .await .contains(&peer_b_id) { return false; } let Some(conns) = peer_mgr_a.get_peer_map().list_peer_conns(peer_b_id).await else { return false; }; conns.iter().any(|c| { c.noise_local_static_pubkey.len() == 32 && c.noise_remote_static_pubkey.len() == 32 && c.secure_auth_level == SecureAuthLevel::NetworkSecretConfirmed as i32 }) } }, Duration::from_secs(10), ) .await; let peer_a_id = peer_mgr_a.my_peer_id(); wait_for_condition( || { let peer_mgr_b = peer_mgr_b.clone(); async move { if !peer_mgr_b .get_peer_map() .list_peers_with_conn() .await .contains(&peer_a_id) { return false; } let Some(conns) = peer_mgr_b.get_peer_map().list_peer_conns(peer_a_id).await else { return false; }; conns.iter().any(|c| { c.noise_local_static_pubkey.len() == 32 && c.noise_remote_static_pubkey.len() == 32 && c.secure_auth_level == SecureAuthLevel::NetworkSecretConfirmed as i32 }) } }, Duration::from_secs(10), ) .await; } #[tokio::test] async fn peer_manager_same_network_secure_mode_mismatch_rejected() { let peer_mgr_client = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_server = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; peer_mgr_client .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); peer_mgr_server .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); set_secure_mode_cfg(&peer_mgr_server.get_global_ctx(), true); let (c_ring, s_ring) = create_ring_tunnel_pair(); let (c_ret, s_ret) = tokio::join!( peer_mgr_client.add_client_tunnel(c_ring, false), peer_mgr_server.add_tunnel_as_server(s_ring, true) ); let _ = c_ret; assert!( s_ret.is_err(), "same-network peer with mismatched secure mode should be rejected" ); wait_for_condition( || { let peer_mgr_server = peer_mgr_server.clone(); async move { peer_mgr_server .get_peer_map() .list_peers_with_conn() .await .is_empty() } }, Duration::from_secs(5), ) .await; } #[tokio::test] async fn credential_node_rejects_legacy_client() { let peer_mgr_client = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_server = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; peer_mgr_client .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); peer_mgr_server .get_global_ctx() .config .set_network_identity(NetworkIdentity::new_credential("net1".to_string())); set_secure_mode_cfg(&peer_mgr_server.get_global_ctx(), true); let (c_ring, s_ring) = create_ring_tunnel_pair(); let (c_ret, s_ret) = tokio::join!( peer_mgr_client.add_client_tunnel(c_ring, false), peer_mgr_server.add_tunnel_as_server(s_ring, true) ); let _ = c_ret; assert!( s_ret.is_err(), "credential server should reject legacy client" ); wait_for_condition( || { let peer_mgr_server = peer_mgr_server.clone(); async move { peer_mgr_server .get_peer_map() .list_peers_with_conn() .await .is_empty() } }, Duration::from_secs(5), ) .await; } #[tokio::test] async fn peer_manager_safe_mode_shared_node_pinning_connect() { let peer_mgr_client = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_server = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; peer_mgr_client .get_global_ctx() .config .set_network_identity(NetworkIdentity::new("user".to_string(), "sec1".to_string())); peer_mgr_server .get_global_ctx() .config .set_network_identity(NetworkIdentity { network_name: "shared".to_string(), network_secret: None, network_secret_digest: None, }); set_secure_mode_cfg(&peer_mgr_client.get_global_ctx(), true); set_secure_mode_cfg(&peer_mgr_server.get_global_ctx(), true); let server_pub_b64 = peer_mgr_server .get_global_ctx() .config .get_secure_mode() .unwrap() .local_public_key .unwrap(); let (a_ring, b_ring) = create_ring_tunnel_pair(); let server_remote_url: url::Url = a_ring .info() .unwrap() .remote_addr .unwrap() .url .parse() .unwrap(); peer_mgr_client.get_global_ctx().config.set_peers(vec![ crate::common::config::PeerConfig { uri: server_remote_url, peer_public_key: Some(server_pub_b64.clone()), }, ]); let (c_ret, s_ret) = tokio::join!( peer_mgr_client.add_client_tunnel(a_ring, false), peer_mgr_server.add_tunnel_as_server(b_ring, true) ); c_ret.unwrap(); s_ret.unwrap(); wait_for_condition( || { let peer_mgr_client = peer_mgr_client.clone(); async move { let foreign_peer_map = peer_mgr_client.get_foreign_network_client().get_peer_map(); if foreign_peer_map.list_peers_with_conn().await.len() != 1 { return false; } let Some(peer_id) = foreign_peer_map .list_peers_with_conn() .await .into_iter() .next() else { return false; }; let Some(conns) = foreign_peer_map.list_peer_conns(peer_id).await else { return false; }; conns.iter().any(|c| { c.secure_auth_level == SecureAuthLevel::PeerVerified as i32 && c.noise_local_static_pubkey.len() == 32 && c.noise_remote_static_pubkey.len() == 32 }) } }, Duration::from_secs(10), ) .await; wait_for_condition( || { let peer_mgr_server = peer_mgr_server.clone(); async move { let foreigns = peer_mgr_server .get_foreign_network_manager() .list_foreign_networks() .await; let Some(entry) = foreigns.foreign_networks.get("user") else { return false; }; entry.peers.iter().any(|p| { p.conns .iter() .any(|c| c.noise_local_static_pubkey.len() == 32) }) } }, Duration::from_secs(10), ) .await; } async fn connect_peer_manager_with( client_mgr: Arc, server_mgr: &Arc, mut client: C, server: &mut L, ) { server.listen().await.unwrap(); tokio::spawn(async move { client.set_bind_addrs(vec![]); client_mgr.try_direct_connect(client).await.unwrap(); }); server_mgr .add_client_tunnel(server.accept().await.unwrap(), false) .await .unwrap(); } #[rstest::rstest] #[tokio::test] #[serial_test::serial(forward_packet_test)] async fn forward_packet( #[values("tcp", "udp", "wg", "quic")] proto1: &str, #[values("tcp", "udp", "wg", "quic")] proto2: &str, ) { use crate::proto::{ rpc_impl::RpcController, tests::{GreetingClientFactory, SayHelloRequest}, }; let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; register_service(&peer_mgr_a.peer_rpc_mgr, "", 0, "hello a"); let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; register_service(&peer_mgr_c.peer_rpc_mgr, "", 0, "hello c"); let mut listener1 = create_listener_by_url( &format!("{}://0.0.0.0:31013", proto1).parse().unwrap(), peer_mgr_b.get_global_ctx(), ) .unwrap(); let connector1 = create_connector_by_url( format!("{}://127.0.0.1:31013", proto1).as_str(), &peer_mgr_a.get_global_ctx(), crate::tunnel::IpVersion::Both, ) .await .unwrap(); connect_peer_manager_with(peer_mgr_a.clone(), &peer_mgr_b, connector1, &mut listener1) .await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) .await .unwrap(); let mut listener2 = create_listener_by_url( &format!("{}://0.0.0.0:31014", proto2).parse().unwrap(), peer_mgr_c.get_global_ctx(), ) .unwrap(); let connector2 = create_connector_by_url( format!("{}://127.0.0.1:31014", proto2).as_str(), &peer_mgr_b.get_global_ctx(), crate::tunnel::IpVersion::Both, ) .await .unwrap(); connect_peer_manager_with(peer_mgr_b.clone(), &peer_mgr_c, connector2, &mut listener2) .await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone()) .await .unwrap(); let stub = peer_mgr_a .peer_rpc_mgr .rpc_client() .scoped_client::>( peer_mgr_a.my_peer_id, peer_mgr_c.my_peer_id, "".to_string(), ); let ret = stub .say_hello( RpcController::default(), SayHelloRequest { name: "abc".to_string(), }, ) .await .unwrap(); assert_eq!(ret.greeting, "hello c abc!"); } #[tokio::test] async fn communicate_between_enc_and_non_enc() { let create_mgr = |enable_encryption| async move { let (s, _r) = create_packet_recv_chan(); let mock_global_ctx = get_mock_global_ctx(); mock_global_ctx.set_flags(Flags { enable_encryption, data_compress_algo: CompressionAlgoPb::Zstd.into(), ..Default::default() }); let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, mock_global_ctx, s)); peer_mgr.run().await.unwrap(); peer_mgr }; let peer_mgr_a = create_mgr(true).await; let peer_mgr_b = create_mgr(false).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; // wait 5sec should not crash. tokio::time::sleep(Duration::from_secs(5)).await; // both mgr should alive let mgr_c = create_mgr(true).await; connect_peer_manager(peer_mgr_a.clone(), mgr_c.clone()).await; wait_route_appear(mgr_c, peer_mgr_a).await.unwrap(); let mgr_d = create_mgr(false).await; connect_peer_manager(peer_mgr_b.clone(), mgr_d.clone()).await; wait_route_appear(mgr_d, peer_mgr_b).await.unwrap(); } #[tokio::test] async fn test_avoid_relay_data() { // a->b->c // a->d->e->c let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_d = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_e = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; println!("peer_mgr_a: {}", peer_mgr_a.my_peer_id); println!("peer_mgr_b: {}", peer_mgr_b.my_peer_id); println!("peer_mgr_c: {}", peer_mgr_c.my_peer_id); println!("peer_mgr_d: {}", peer_mgr_d.my_peer_id); println!("peer_mgr_e: {}", peer_mgr_e.my_peer_id); connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_d.clone()).await; connect_peer_manager(peer_mgr_d.clone(), peer_mgr_e.clone()).await; connect_peer_manager(peer_mgr_e.clone(), peer_mgr_c.clone()).await; // when b's avoid_relay_data is false, a->c should route through b and cost is 2 wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(2)) .await .unwrap(); let ret = peer_mgr_a .get_route() .get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost) .await; assert_eq!(ret, Some(peer_mgr_b.my_peer_id)); // when b's avoid_relay_data is true, a->c should route through d and e, cost is 3 peer_mgr_b .get_global_ctx() .set_avoid_relay_data_preference(true); tokio::time::sleep(Duration::from_secs(2)).await; if wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(3)) .await .is_err() { panic!( "route not appear, a route table: {}, table: {:#?}", peer_mgr_a.get_route().dump().await, peer_mgr_a.get_route().list_routes().await ) } let ret = peer_mgr_a .get_route() .get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost) .await; assert_eq!(ret, Some(peer_mgr_d.my_peer_id)); println!("route table: {:#?}", peer_mgr_a.list_routes().await); // drop e, path should go back to through b drop(peer_mgr_e); wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(2)) .await .unwrap(); let ret = peer_mgr_a .get_route() .get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost) .await; assert_eq!(ret, Some(peer_mgr_b.my_peer_id)); } #[tokio::test] async fn test_client_inbound_blackhole() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; // a is client, b is server let (a_ring, b_ring) = create_ring_tunnel_pair(); let a_ring = Box::new(TunnelWithFilter::new( a_ring, DropSendTunnelFilter::new(2, 50000), )); let a_mgr_copy = peer_mgr_a.clone(); tokio::spawn(async move { a_mgr_copy.add_client_tunnel(a_ring, false).await.unwrap(); }); let b_mgr_copy = peer_mgr_b.clone(); tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await.unwrap(); }); wait_for_condition( || async { let peers = peer_mgr_a.list_peers().await; peers.is_empty() }, Duration::from_secs(10), ) .await; } #[tokio::test] async fn close_conn_in_peer_map() { let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) .await .unwrap(); let conns = peer_mgr_a .get_peer_map() .list_peer_conns(peer_mgr_b.my_peer_id) .await; assert!(conns.is_some()); let conn_info = conns.as_ref().unwrap().first().unwrap(); peer_mgr_a .close_peer_conn(peer_mgr_b.my_peer_id, &conn_info.conn_id.parse().unwrap()) .await .unwrap(); wait_for_condition( || async { let peers = peer_mgr_a.list_peers().await; peers.is_empty() }, Duration::from_secs(10), ) .await; // a is client, b is server } #[tokio::test] async fn close_conn_in_foreign_network_client() { let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await; let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await; connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await; wait_for_condition( || async { peer_mgr_client .get_foreign_network_client() .list_public_peers() .await .len() == 1 }, Duration::from_secs(3), ) .await; let peer_id = peer_mgr_client .foreign_network_client .list_public_peers() .await[0]; let conns = peer_mgr_client .foreign_network_client .get_peer_map() .list_peer_conns(peer_id) .await; assert!(conns.is_some()); let conn_info = conns.as_ref().unwrap().first().unwrap(); peer_mgr_client .close_peer_conn(peer_id, &conn_info.conn_id.parse().unwrap()) .await .unwrap(); wait_for_condition( || async { peer_mgr_client .get_foreign_network_client() .list_public_peers() .await .is_empty() }, Duration::from_secs(10), ) .await; } #[tokio::test] async fn close_conn_in_foreign_network_manager() { let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await; let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await; connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await; wait_for_condition( || async { peer_mgr_client .get_foreign_network_client() .list_public_peers() .await .len() == 1 }, Duration::from_secs(3), ) .await; let conns = peer_mgr_server .foreign_network_manager .list_foreign_networks() .await; let client_info = conns.foreign_networks["client"].peers[0].clone(); let conn_info = client_info.conns[0].clone(); peer_mgr_server .close_peer_conn(client_info.peer_id, &conn_info.conn_id.parse().unwrap()) .await .unwrap(); wait_for_condition( || async { peer_mgr_client .get_foreign_network_client() .list_public_peers() .await .is_empty() }, Duration::from_secs(10), ) .await; } }