diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 55852c96..ef1dd1a8 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -254,15 +254,16 @@ jobs: run: | mkdir -p ./artifacts/objects/ # windows is the only OS using a different convention for executable file name - if [[ $OS =~ ^windows.*$ && $TARGET =~ ^x86_64.*$ ]]; then + if [[ $OS =~ ^windows.*$ ]]; then SUFFIX=.exe - cp easytier/third_party/x86_64/* ./artifacts/objects/ - elif [[ $OS =~ ^windows.*$ && $TARGET =~ ^i686.*$ ]]; then - SUFFIX=.exe - cp easytier/third_party/i686/* ./artifacts/objects/ - elif [[ $OS =~ ^windows.*$ && $TARGET =~ ^aarch64.*$ ]]; then - SUFFIX=.exe - cp easytier/third_party/arm64/* ./artifacts/objects/ + case $TARGET in + x86_64*) ARCH_DIR=x86_64 ;; + i686*) ARCH_DIR=i686 ;; + aarch64*) ARCH_DIR=arm64 ;; + esac + if [[ -n "$ARCH_DIR" ]]; then + find "easytier/third_party/${ARCH_DIR}" -maxdepth 1 -type f \( -name "*.dll" -o -name "*.sys" \) -exec cp {} ./artifacts/objects/ \; + fi fi if [[ $GITHUB_REF_TYPE =~ ^tag$ ]]; then TAG=$GITHUB_REF_NAME diff --git a/.github/workflows/gui.yml b/.github/workflows/gui.yml index 956f1f60..b685e79b 100644 --- a/.github/workflows/gui.yml +++ b/.github/workflows/gui.yml @@ -169,12 +169,13 @@ jobs: - name: copy correct DLLs if: ${{ matrix.OS == 'windows-latest' }} run: | - if [[ $GUI_TARGET =~ ^aarch64.*$ ]]; then - cp ./easytier/third_party/arm64/* ./easytier-gui/src-tauri/ - elif [[ $GUI_TARGET =~ ^i686.*$ ]]; then - cp ./easytier/third_party/i686/* ./easytier-gui/src-tauri/ - else - cp ./easytier/third_party/x86_64/* ./easytier-gui/src-tauri/ + case $TARGET in + x86_64*) ARCH_DIR=x86_64 ;; + i686*) ARCH_DIR=i686 ;; + aarch64*) ARCH_DIR=arm64 ;; + esac + if [[ -n "$ARCH_DIR" ]]; then + find "./easytier/third_party/${ARCH_DIR}" -maxdepth 1 -type f \( -name "*.dll" -o -name "*.sys" \) -exec cp {} ./easytier-gui/src-tauri/ \; fi - name: Build GUI diff --git a/easytier-gui/src-tauri/src/elevate/macos.rs b/easytier-gui/src-tauri/src/elevate/macos.rs index 18b721d8..fc6c2136 100644 --- a/easytier-gui/src-tauri/src/elevate/macos.rs +++ b/easytier-gui/src-tauri/src/elevate/macos.rs @@ -16,6 +16,8 @@ use super::Command; use anyhow::Result; use std::env; +use std::fs::File; +use std::io::Read as _; use std::path::PathBuf; use std::process::{ExitStatus, Output}; @@ -23,10 +25,12 @@ use std::ffi::{CString, OsString}; use std::io; use std::mem; use std::os::unix::ffi::OsStrExt; +use std::os::unix::io::FromRawFd; +use std::os::unix::process::ExitStatusExt; use std::path::Path; use std::ptr; -use libc::{fcntl, fileno, waitpid, EINTR, F_GETOWN}; +use libc::{fileno, wait, EINTR, SHUT_WR}; use security_framework_sys::authorization::{ errAuthorizationSuccess, kAuthorizationFlagDefaults, kAuthorizationFlagDestroyRights, AuthorizationCreate, AuthorizationExecuteWithPrivileges, AuthorizationFree, AuthorizationRef, @@ -71,7 +75,7 @@ macro_rules! make_cstring { }; } -unsafe fn gui_runas(prog: *const i8, argv: *const *const i8) -> i32 { +unsafe fn gui_runas(prog: *const i8, argv: *const *const i8) -> io::Result { let mut authref: AuthorizationRef = ptr::null_mut(); let mut pipe: *mut libc::FILE = ptr::null_mut(); @@ -82,7 +86,7 @@ unsafe fn gui_runas(prog: *const i8, argv: *const *const i8) -> i32 { &mut authref, ) != errAuthorizationSuccess { - return -1; + return Err(io::Error::last_os_error()); } if AuthorizationExecuteWithPrivileges( authref, @@ -93,22 +97,66 @@ unsafe fn gui_runas(prog: *const i8, argv: *const *const i8) -> i32 { ) != errAuthorizationSuccess { AuthorizationFree(authref, kAuthorizationFlagDestroyRights); - return -1; + return Err(io::Error::last_os_error()); + } + + let fd = fileno(pipe); + if fd == -1 { + AuthorizationFree(authref, kAuthorizationFlagDestroyRights); + return Err(io::Error::last_os_error()); + } + + // We never send input to the elevated GUI. Close the parent write half so + // the child sees EOF on stdin instead of waiting forever. + if libc::shutdown(fd, SHUT_WR) == -1 { + let err = io::Error::last_os_error(); + libc::fclose(pipe); + AuthorizationFree(authref, kAuthorizationFlagDestroyRights); + return Err(err); + } + + // AuthorizationExecuteWithPrivileges wires the tool's stdin/stdout to a + // bidirectional pipe. Drain stdout so the child can't block on a full pipe. + let read_fd = libc::dup(fd); + if read_fd == -1 { + let err = io::Error::last_os_error(); + libc::fclose(pipe); + AuthorizationFree(authref, kAuthorizationFlagDestroyRights); + return Err(err); + } + let mut pipe_file = unsafe { File::from_raw_fd(read_fd) }; + let mut sink = [0_u8; 8192]; + loop { + match pipe_file.read(&mut sink) { + Ok(0) => break, + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::Interrupted => continue, + Err(err) => { + libc::fclose(pipe); + AuthorizationFree(authref, kAuthorizationFlagDestroyRights); + return Err(err); + } + } } - let pid = fcntl(fileno(pipe), F_GETOWN, 0); let mut status = 0; loop { - let r = waitpid(pid, &mut status, 0); + let r = wait(&mut status); if r == -1 && io::Error::last_os_error().raw_os_error() == Some(EINTR) { continue; + } else if r == -1 { + let err = io::Error::last_os_error(); + libc::fclose(pipe); + AuthorizationFree(authref, kAuthorizationFlagDestroyRights); + return Err(err); } else { break; } } + libc::fclose(pipe); AuthorizationFree(authref, kAuthorizationFlagDestroyRights); - status + Ok(ExitStatus::from_raw(status)) } fn runas_root_gui(cmd: &Command) -> io::Result { @@ -126,7 +174,7 @@ fn runas_root_gui(cmd: &Command) -> io::Result { let mut argv: Vec<_> = args.iter().map(|x| x.as_ptr()).collect(); argv.push(ptr::null()); - unsafe { Ok(mem::transmute(gui_runas(prog.as_ptr(), argv.as_ptr()))) } + unsafe { gui_runas(prog.as_ptr(), argv.as_ptr()) } } /// The implementation of state check and elevated executing varies on each platform diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index 007df2f9..3947cb5b 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -146,7 +146,6 @@ async fn collect_network_info( #[tauri::command] async fn set_logging_level(level: String) -> Result<(), String> { - println!("Setting logging level to: {}", level); get_client_manager!()? .set_logging_level(level.clone()) .await diff --git a/easytier/src/common/stats_manager.rs b/easytier/src/common/stats_manager.rs index 8d2ce7e5..38448f8e 100644 --- a/easytier/src/common/stats_manager.rs +++ b/easytier/src/common/stats_manager.rs @@ -24,10 +24,22 @@ pub enum MetricName { /// RPC errors PeerRpcErrors, - /// Traffic bytes sent + /// Data-plane traffic bytes sent TrafficBytesTx, - /// Traffic bytes received + /// Data-plane traffic bytes sent, grouped by destination instance + TrafficBytesTxByInstance, + /// Data-plane traffic bytes received TrafficBytesRx, + /// Data-plane traffic bytes received, grouped by source instance + TrafficBytesRxByInstance, + /// Control-plane traffic bytes sent + TrafficControlBytesTx, + /// Control-plane traffic bytes sent, grouped by destination instance + TrafficControlBytesTxByInstance, + /// Control-plane traffic bytes received + TrafficControlBytesRx, + /// Control-plane traffic bytes received, grouped by source instance + TrafficControlBytesRxByInstance, /// Traffic bytes forwarded TrafficBytesForwarded, /// Traffic bytes sent to self @@ -41,10 +53,22 @@ pub enum MetricName { /// Traffic bytes forwarded for foreign network, forward TrafficBytesForeignForwardForwarded, - /// Traffic packets sent + /// Data-plane traffic packets sent TrafficPacketsTx, - /// Traffic packets received + /// Data-plane traffic packets sent, grouped by destination instance + TrafficPacketsTxByInstance, + /// Data-plane traffic packets received TrafficPacketsRx, + /// Data-plane traffic packets received, grouped by source instance + TrafficPacketsRxByInstance, + /// Control-plane traffic packets sent + TrafficControlPacketsTx, + /// Control-plane traffic packets sent, grouped by destination instance + TrafficControlPacketsTxByInstance, + /// Control-plane traffic packets received + TrafficControlPacketsRx, + /// Control-plane traffic packets received, grouped by source instance + TrafficControlPacketsRxByInstance, /// Traffic packets forwarded TrafficPacketsForwarded, /// Traffic packets sent to self @@ -81,7 +105,17 @@ impl fmt::Display for MetricName { MetricName::PeerRpcErrors => write!(f, "peer_rpc_errors"), MetricName::TrafficBytesTx => write!(f, "traffic_bytes_tx"), + MetricName::TrafficBytesTxByInstance => write!(f, "traffic_bytes_tx_by_instance"), MetricName::TrafficBytesRx => write!(f, "traffic_bytes_rx"), + MetricName::TrafficBytesRxByInstance => write!(f, "traffic_bytes_rx_by_instance"), + MetricName::TrafficControlBytesTx => write!(f, "traffic_control_bytes_tx"), + MetricName::TrafficControlBytesTxByInstance => { + write!(f, "traffic_control_bytes_tx_by_instance") + } + MetricName::TrafficControlBytesRx => write!(f, "traffic_control_bytes_rx"), + MetricName::TrafficControlBytesRxByInstance => { + write!(f, "traffic_control_bytes_rx_by_instance") + } MetricName::TrafficBytesForwarded => write!(f, "traffic_bytes_forwarded"), MetricName::TrafficBytesSelfTx => write!(f, "traffic_bytes_self_tx"), MetricName::TrafficBytesSelfRx => write!(f, "traffic_bytes_self_rx"), @@ -96,7 +130,21 @@ impl fmt::Display for MetricName { } MetricName::TrafficPacketsTx => write!(f, "traffic_packets_tx"), + MetricName::TrafficPacketsTxByInstance => { + write!(f, "traffic_packets_tx_by_instance") + } MetricName::TrafficPacketsRx => write!(f, "traffic_packets_rx"), + MetricName::TrafficPacketsRxByInstance => { + write!(f, "traffic_packets_rx_by_instance") + } + MetricName::TrafficControlPacketsTx => write!(f, "traffic_control_packets_tx"), + MetricName::TrafficControlPacketsTxByInstance => { + write!(f, "traffic_control_packets_tx_by_instance") + } + MetricName::TrafficControlPacketsRx => write!(f, "traffic_control_packets_rx"), + MetricName::TrafficControlPacketsRxByInstance => { + write!(f, "traffic_control_packets_rx_by_instance") + } MetricName::TrafficPacketsForwarded => write!(f, "traffic_packets_forwarded"), MetricName::TrafficPacketsSelfTx => write!(f, "traffic_packets_self_tx"), MetricName::TrafficPacketsSelfRx => write!(f, "traffic_packets_self_rx"), @@ -125,6 +173,10 @@ impl fmt::Display for MetricName { pub enum LabelType { /// Network Name NetworkName(String), + /// Destination instance ID + ToInstanceId(String), + /// Source instance ID + FromInstanceId(String), /// Source peer ID SrcPeerId(u32), /// Destination peer ID @@ -153,6 +205,8 @@ impl fmt::Display for LabelType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { LabelType::NetworkName(name) => write!(f, "network_name={}", name), + LabelType::ToInstanceId(id) => write!(f, "to_instance_id={}", id), + LabelType::FromInstanceId(id) => write!(f, "from_instance_id={}", id), LabelType::SrcPeerId(id) => write!(f, "src_peer_id={}", id), LabelType::DstPeerId(id) => write!(f, "dst_peer_id={}", id), LabelType::ServiceName(name) => write!(f, "service_name={}", name), @@ -172,6 +226,8 @@ impl LabelType { pub fn key(&self) -> &'static str { match self { LabelType::NetworkName(_) => "network_name", + LabelType::ToInstanceId(_) => "to_instance_id", + LabelType::FromInstanceId(_) => "from_instance_id", LabelType::SrcPeerId(_) => "src_peer_id", LabelType::DstPeerId(_) => "dst_peer_id", LabelType::ServiceName(_) => "service_name", @@ -189,6 +245,8 @@ impl LabelType { pub fn value(&self) -> String { match self { LabelType::NetworkName(name) => name.clone(), + LabelType::ToInstanceId(id) => id.clone(), + LabelType::FromInstanceId(id) => id.clone(), LabelType::SrcPeerId(id) => id.to_string(), LabelType::DstPeerId(id) => id.to_string(), LabelType::ServiceName(name) => name.clone(), @@ -677,6 +735,20 @@ mod tests { .with_label("method", "ping"); assert_eq!(labels.to_key(), "method=ping,peer_id=peer1"); + + let instance_labels = LabelSet::new() + .with_label_type(LabelType::NetworkName("default".to_string())) + .with_label_type(LabelType::ToInstanceId( + "87ede5a2-9c3d-492d-9bbe-989b9d07e742".to_string(), + )) + .with_label_type(LabelType::FromInstanceId( + "9b7d4368-b688-4897-a1f4-b6caaed9e8a6".to_string(), + )); + + assert_eq!( + instance_labels.to_key(), + "from_instance_id=9b7d4368-b688-4897-a1f4-b6caaed9e8a6,network_name=default,to_instance_id=87ede5a2-9c3d-492d-9bbe-989b9d07e742" + ); } #[tokio::test] @@ -745,12 +817,24 @@ mod tests { let counter2 = stats.get_counter(MetricName::PeerRpcClientTx, labels); counter2.set(50); + let traffic_labels = LabelSet::new() + .with_label_type(LabelType::NetworkName("default".to_string())) + .with_label_type(LabelType::ToInstanceId( + "87ede5a2-9c3d-492d-9bbe-989b9d07e742".to_string(), + )); + let counter3 = stats.get_counter(MetricName::TrafficBytesTxByInstance, traffic_labels); + counter3.set(25); + let prometheus_output = stats.export_prometheus(); assert!(prometheus_output.contains("# TYPE peer_rpc_client_tx counter")); assert!(prometheus_output.contains("peer_rpc_client_tx{status=\"success\"} 50")); assert!(prometheus_output.contains("# TYPE traffic_bytes_tx counter")); assert!(prometheus_output.contains("traffic_bytes_tx 100")); + assert!(prometheus_output.contains("# TYPE traffic_bytes_tx_by_instance counter")); + assert!(prometheus_output.contains( + "traffic_bytes_tx_by_instance{network_name=\"default\",to_instance_id=\"87ede5a2-9c3d-492d-9bbe-989b9d07e742\"} 25" + )); } #[tokio::test] diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 8c2e25b5..173a978c 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -54,6 +54,10 @@ use super::{ recv_packet_from_chan, relay_peer_map::RelayPeerMap, route_trait::NextHopPolicy, + traffic_metrics::{ + route_peer_info_instance_id, InstanceLabelKind, LogicalTrafficMetrics, + TrafficMetricRecorder, + }, PacketRecvChan, PacketRecvChanReceiver, PUBLIC_SERVER_HOSTNAME_PREFIX, }; @@ -84,6 +88,7 @@ struct ForeignNetworkEntry { peer_center: Arc, stats_mgr: Arc, + traffic_metrics: Arc, tasks: Mutex>, @@ -103,6 +108,7 @@ impl ForeignNetworkEntry { let stats_mgr = global_ctx.stats_manager().clone(); let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone(), relay_data); + let network_name = network.network_name.clone(); let (packet_sender, packet_recv) = create_packet_recv_chan(); @@ -111,6 +117,58 @@ impl ForeignNetworkEntry { foreign_global_ctx.clone(), my_peer_id, )); + let traffic_metrics = Arc::new(TrafficMetricRecorder::new( + my_peer_id, + Arc::new(LogicalTrafficMetrics::new( + stats_mgr.clone(), + network_name.clone(), + MetricName::TrafficBytesTx, + MetricName::TrafficPacketsTx, + MetricName::TrafficBytesTxByInstance, + MetricName::TrafficPacketsTxByInstance, + InstanceLabelKind::To, + )), + Arc::new(LogicalTrafficMetrics::new( + stats_mgr.clone(), + network_name.clone(), + MetricName::TrafficControlBytesTx, + MetricName::TrafficControlPacketsTx, + MetricName::TrafficControlBytesTxByInstance, + MetricName::TrafficControlPacketsTxByInstance, + InstanceLabelKind::To, + )), + Arc::new(LogicalTrafficMetrics::new( + stats_mgr.clone(), + network_name.clone(), + MetricName::TrafficBytesRx, + MetricName::TrafficPacketsRx, + MetricName::TrafficBytesRxByInstance, + MetricName::TrafficPacketsRxByInstance, + InstanceLabelKind::From, + )), + Arc::new(LogicalTrafficMetrics::new( + stats_mgr.clone(), + network_name.clone(), + MetricName::TrafficControlBytesRx, + MetricName::TrafficControlPacketsRx, + MetricName::TrafficControlBytesRxByInstance, + MetricName::TrafficControlPacketsRxByInstance, + InstanceLabelKind::From, + )), + { + let peer_map = peer_map.clone(); + move |peer_id| { + let peer_map = peer_map.clone(); + async move { + peer_map + .get_route_peer_info(peer_id) + .await + .as_ref() + .and_then(route_peer_info_instance_id) + } + } + }, + )); let relay_peer_map = RelayPeerMap::new( peer_map.clone(), None, @@ -164,6 +222,7 @@ impl ForeignNetworkEntry { bps_limiter, stats_mgr, + traffic_metrics, tasks: Mutex::new(JoinSet::new()), @@ -353,6 +412,7 @@ impl ForeignNetworkEntry { let rpc_sender = self.rpc_sender.clone(); let peer_map = self.peer_map.clone(); let relay_peer_map = self.relay_peer_map.clone(); + let traffic_metrics = self.traffic_metrics.clone(); let relay_data = self.relay_data; let pm_sender = self.pm_packet_sender.lock().await.take().unwrap(); let network_name = self.network.network_name.clone(); @@ -385,7 +445,14 @@ impl ForeignNetworkEntry { let packet_type = hdr.packet_type; let len = hdr.len.get(); let to_peer_id = hdr.to_peer_id.get(); - if to_peer_id == my_node_id { + let is_local_delivery = to_peer_id == my_node_id; + let is_locally_originated = from_peer_id == my_node_id; + if is_local_delivery && !is_locally_originated { + traffic_metrics + .record_rx(from_peer_id, packet_type, buf_len as u64) + .await; + } + if is_local_delivery { if packet_type == PacketType::RelayHandshake as u8 || packet_type == PacketType::RelayHandshakeAck as u8 { @@ -454,6 +521,10 @@ impl ForeignNetworkEntry { ?e, "send packet to foreign peer inside relay peer map failed" ); + } else if is_locally_originated { + traffic_metrics + .record_tx(to_peer_id, packet_type, buf_len as u64) + .await; } } else if let Err(e) = peer_map.send_msg_directly(zc_packet, peer_id).await @@ -462,6 +533,10 @@ impl ForeignNetworkEntry { ?e, "send packet to foreign peer inside peer map failed" ); + } else if is_locally_originated { + traffic_metrics + .record_tx(to_peer_id, packet_type, buf_len as u64) + .await; } } _ => { @@ -478,6 +553,10 @@ impl ForeignNetworkEntry { ); if let Err(e) = pm_sender.send(foreign_packet).await { tracing::error!("send packet to peer with pm failed: {:?}", e); + } else if is_locally_originated { + traffic_metrics + .record_tx(to_peer_id, packet_type, buf_len as u64) + .await; } } }; @@ -917,10 +996,22 @@ impl ForeignNetworkManager { msg: ZCPacket, ) -> Result<(), Error> { if let Some(entry) = self.data.get_network_entry(network_name) { - entry + let packet_type = msg + .peer_manager_header() + .map(|hdr| hdr.packet_type) + .unwrap_or(0); + let msg_len = msg.buf_len() as u64; + let send_result = entry .peer_map .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) - .await + .await; + if send_result.is_ok() { + entry + .traffic_metrics + .record_tx(dst_peer_id, packet_type, msg_len) + .await; + } + send_result } else { Err(Error::RouteError(Some("network not found".to_string()))) } @@ -955,6 +1046,7 @@ impl Drop for ForeignNetworkManager { pub mod tests { use crate::{ common::global_ctx::tests::get_mock_global_ctx_with_network, + common::stats_manager::{LabelSet, LabelType, MetricName}, connector::udp_hole_punch::tests::{ create_mock_peer_manager_with_mock_stun, replace_stun_info_collector, }, @@ -965,12 +1057,24 @@ pub mod tests { }, proto::common::NatType, set_global_var, - tunnel::common::tests::wait_for_condition, + tunnel::{ + common::tests::wait_for_condition, + packet_def::{PacketType, ZCPacket}, + }, }; use std::{collections::HashMap, time::Duration}; use super::*; + fn metric_value(peer_mgr: &PeerManager, metric: MetricName, labels: LabelSet) -> u64 { + peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric(metric, &labels) + .map(|metric| metric.value) + .unwrap_or(0) + } + async fn create_mock_peer_manager_for_foreign_network_ext( network: &str, secret: &str, @@ -1052,6 +1156,260 @@ pub mod tests { assert_eq!(2, rpc_resp.foreign_networks["net1"].peers.len()); } + #[tokio::test] + async fn foreign_network_forwarding_records_traffic_metrics() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + + connect_peer_manager(pma_net1.clone(), pm_center.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await; + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + + let center_peer_id = pm_center + .get_foreign_network_manager() + .get_network_peer_id("net1") + .unwrap(); + + let mut rx_pkt = ZCPacket::new_with_payload(b"foreign-rx"); + rx_pkt.fill_peer_manager_hdr( + pma_net1.my_peer_id(), + center_peer_id, + PacketType::Data as u8, + ); + pma_net1 + .get_foreign_network_client() + .send_msg(rx_pkt, center_peer_id) + .await + .unwrap(); + + let mut tx_pkt = ZCPacket::new_with_payload(b"foreign-tx"); + tx_pkt.fill_peer_manager_hdr( + center_peer_id, + pmb_net1.my_peer_id(), + PacketType::Data as u8, + ); + pm_center + .get_foreign_network_manager() + .forward_foreign_network_packet("net1", pmb_net1.my_peer_id(), tx_pkt) + .await + .unwrap(); + + let network_labels = + LabelSet::new().with_label_type(LabelType::NetworkName("net1".to_string())); + let tx_instance_labels = network_labels + .clone() + .with_label_type(LabelType::ToInstanceId( + pmb_net1.get_global_ctx().get_id().to_string(), + )); + let rx_instance_labels = network_labels + .clone() + .with_label_type(LabelType::FromInstanceId( + pma_net1.get_global_ctx().get_id().to_string(), + )); + + wait_for_condition( + || { + let pm_center = pm_center.clone(); + let network_labels = network_labels.clone(); + let tx_instance_labels = tx_instance_labels.clone(); + let rx_instance_labels = rx_instance_labels.clone(); + async move { + metric_value( + &pm_center, + MetricName::TrafficBytesTx, + network_labels.clone(), + ) > 0 + && metric_value( + &pm_center, + MetricName::TrafficBytesRx, + network_labels.clone(), + ) > 0 + && metric_value( + &pm_center, + MetricName::TrafficBytesTxByInstance, + tx_instance_labels.clone(), + ) > 0 + && metric_value( + &pm_center, + MetricName::TrafficBytesRxByInstance, + rx_instance_labels.clone(), + ) > 0 + } + }, + Duration::from_secs(5), + ) + .await; + } + + #[tokio::test] + async fn foreign_network_transit_forwarding_only_records_forwarded_metrics() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + + connect_peer_manager(pma_net1.clone(), pm_center.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await; + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + + let center_peer_id = pm_center + .get_foreign_network_manager() + .get_network_peer_id("net1") + .unwrap(); + let network_labels = + LabelSet::new().with_label_type(LabelType::NetworkName("net1".to_string())); + let forwarded_bytes_before = metric_value( + &pm_center, + MetricName::TrafficBytesForwarded, + network_labels.clone(), + ); + let rx_bytes_before = metric_value( + &pm_center, + MetricName::TrafficBytesRx, + network_labels.clone(), + ); + let rx_packets_before = metric_value( + &pm_center, + MetricName::TrafficPacketsRx, + network_labels.clone(), + ); + let tx_bytes_before = metric_value( + &pm_center, + MetricName::TrafficBytesTx, + network_labels.clone(), + ); + let tx_packets_before = metric_value( + &pm_center, + MetricName::TrafficPacketsTx, + network_labels.clone(), + ); + + let mut transit_pkt = ZCPacket::new_with_payload(b"foreign-transit"); + transit_pkt.fill_peer_manager_hdr( + pma_net1.my_peer_id(), + pmb_net1.my_peer_id(), + PacketType::Data as u8, + ); + pma_net1 + .get_foreign_network_client() + .send_msg(transit_pkt, center_peer_id) + .await + .unwrap(); + wait_for_condition( + || { + let pm_center = pm_center.clone(); + let network_labels = network_labels.clone(); + async move { + metric_value( + &pm_center, + MetricName::TrafficBytesForwarded, + network_labels.clone(), + ) > forwarded_bytes_before + } + }, + Duration::from_secs(5), + ) + .await; + + assert_eq!( + metric_value( + &pm_center, + MetricName::TrafficBytesRx, + network_labels.clone() + ), + rx_bytes_before + ); + assert_eq!( + metric_value( + &pm_center, + MetricName::TrafficPacketsRx, + network_labels.clone() + ), + rx_packets_before + ); + assert_eq!( + metric_value( + &pm_center, + MetricName::TrafficBytesTx, + network_labels.clone() + ), + tx_bytes_before + ); + assert_eq!( + metric_value(&pm_center, MetricName::TrafficPacketsTx, network_labels), + tx_packets_before + ); + } + + #[tokio::test] + async fn foreign_network_encapsulated_forwarding_records_tx_metrics() { + set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1); + + let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await; + + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await; + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + + let center_peer_id = pm_center1 + .get_foreign_network_manager() + .get_network_peer_id("net1") + .unwrap(); + + let mut encapsulated_tx_pkt = ZCPacket::new_with_payload(b"foreign-encap-tx"); + encapsulated_tx_pkt.fill_peer_manager_hdr( + center_peer_id, + pmb_net1.my_peer_id(), + PacketType::Data as u8, + ); + pma_net1 + .get_foreign_network_client() + .send_msg(encapsulated_tx_pkt, center_peer_id) + .await + .unwrap(); + + let network_labels = + LabelSet::new().with_label_type(LabelType::NetworkName("net1".to_string())); + let tx_instance_labels = network_labels + .clone() + .with_label_type(LabelType::ToInstanceId( + pmb_net1.get_global_ctx().get_id().to_string(), + )); + + wait_for_condition( + || { + let pm_center1 = pm_center1.clone(); + let network_labels = network_labels.clone(); + let tx_instance_labels = tx_instance_labels.clone(); + async move { + metric_value( + &pm_center1, + MetricName::TrafficBytesTx, + network_labels.clone(), + ) > 0 + && metric_value( + &pm_center1, + MetricName::TrafficBytesTxByInstance, + tx_instance_labels.clone(), + ) > 0 + } + }, + Duration::from_secs(5), + ) + .await; + } + #[tokio::test] async fn foreign_network_list_can_include_trusted_keys() { let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index 6c7a3473..523d6718 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -14,6 +14,7 @@ pub mod peer_session; pub mod relay_peer_map; pub mod route_trait; pub mod rpc_service; +mod traffic_metrics; pub mod foreign_network_client; pub mod foreign_network_manager; diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 36725fd1..0aca7265 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -8,7 +8,6 @@ use std::{ }, }; -use arc_swap::ArcSwapOption; use crossbeam::atomic::AtomicCell; use futures::{StreamExt, TryFutureExt}; @@ -34,7 +33,6 @@ use crate::{ defer, error::Error, global_ctx::ArcGlobalCtx, - stats_manager::{CounterHandle, LabelSet, LabelType, MetricName}, PeerId, }, peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn}, @@ -59,6 +57,7 @@ use crate::{ use super::{ peer_conn_ping::PeerConnPinger, peer_session::{PeerSession, PeerSessionAction}, + traffic_metrics::AggregateTrafficMetrics, PacketRecvChan, }; @@ -284,13 +283,6 @@ impl PeerConnCloseNotify { } } -struct PeerConnCounter { - traffic_tx_bytes: CounterHandle, - traffic_rx_bytes: CounterHandle, - traffic_tx_packets: CounterHandle, - traffic_rx_packets: CounterHandle, -} - pub struct PeerConn { conn_id: PeerConnId, @@ -323,8 +315,6 @@ pub struct PeerConn { throughput: Arc, loss_rate_stats: Arc, - counters: ArcSwapOption, - peer_session_store: Arc, my_encrypt_algo: String, } @@ -413,8 +403,6 @@ impl PeerConn { throughput, loss_rate_stats: Arc::new(AtomicU32::new(0)), - counters: ArcSwapOption::new(None), - peer_session_store, my_encrypt_algo, } @@ -480,6 +468,7 @@ impl PeerConn { }; *need_retry = true; + let rsp_len = rsp.buf_len() as u64; let Some(peer_mgr_hdr) = rsp.peer_manager_header() else { return Err(Error::WaitRespError(format!( @@ -505,6 +494,8 @@ impl PeerConn { )); } + self.record_control_rx(&rsp.network_name, rsp_len); + Ok(rsp) } @@ -527,7 +518,11 @@ impl PeerConn { .await? } - async fn send_handshake(&self, send_secret_digest: bool) -> Result<(), Error> { + async fn send_handshake( + &self, + send_secret_digest: bool, + metric_network_name: &str, + ) -> Result<(), Error> { let network = self.global_ctx.get_network_identity(); let mut req = HandshakeRequest { magic: MAGIC, @@ -555,11 +550,13 @@ impl PeerConn { PeerId::default(), PacketType::HandShake as u8, ); + let pkt_len = zc_packet.buf_len() as u64; self.sink.send(zc_packet).await.map_err(|e| { tracing::warn!("send handshake request error: {:?}", e); Error::WaitRespError("send handshake request error".to_owned()) })?; + self.record_control_tx(metric_network_name, pkt_len); // yield to send the response packet tokio::task::yield_now().await; @@ -663,6 +660,7 @@ impl PeerConn { pb: Msg, packet_type: PacketType, remote_peer_id: PeerId, + metric_network_name: &str, hs: &mut snow::HandshakeState, ) -> Result<(), Error> { tracing::info!( @@ -679,7 +677,10 @@ impl PeerConn { .map_err(|e| Error::WaitRespError(format!("noise write msg1 failed: {e:?}")))?; let mut pkt = ZCPacket::new_with_payload(&msg[..msg_len]); pkt.fill_peer_manager_hdr(self.my_peer_id, remote_peer_id, packet_type as u8); - Ok(self.sink.send(pkt).await?) + let pkt_len = pkt.buf_len() as u64; + self.sink.send(pkt).await?; + self.record_control_tx(metric_network_name, pkt_len); + Ok(()) } /// Unified remote peer authentication verification. @@ -833,6 +834,7 @@ impl PeerConn { msg1_pb, PacketType::NoiseHandshakeMsg1, PeerId::default(), + &network.network_name, &mut hs, ) .await?; @@ -844,6 +846,7 @@ impl PeerConn { self.recv_next_peer_manager_packet(Some(PacketType::NoiseHandshakeMsg2)), ) .await??; + self.record_control_rx(&network.network_name, msg2.buf_len() as u64); let remote_peer_id = msg2.get_src_peer_id().expect("missing src peer id"); if let Some(hint) = self.peer_id_hint { if hint != remote_peer_id { @@ -896,6 +899,7 @@ impl PeerConn { msg3_pb, PacketType::NoiseHandshakeMsg3, remote_peer_id, + &network.network_name, &mut hs, ) .await?; @@ -1063,6 +1067,7 @@ impl PeerConn { let remote_peer_id = first_msg1 .get_src_peer_id() .expect("msg1 must have src peer id"); + let first_msg1_len = first_msg1.buf_len() as u64; let msg1_pb = Self::decode_handshake_message::( PacketType::NoiseHandshakeMsg1, @@ -1070,6 +1075,7 @@ impl PeerConn { first_msg1, )?; let remote_network_name = msg1_pb.a_network_name.clone(); + self.record_control_rx(&remote_network_name, first_msg1_len); // this may update my peer id handshake_recved(self, &remote_network_name)?; @@ -1122,6 +1128,7 @@ impl PeerConn { msg2_pb, PacketType::NoiseHandshakeMsg2, remote_peer_id, + &remote_network_name, &mut hs, ) .await?; @@ -1133,6 +1140,7 @@ impl PeerConn { self.recv_next_peer_manager_packet(Some(PacketType::NoiseHandshakeMsg3)), ) .await??; + self.record_control_rx(&remote_network_name, msg3_pkt.buf_len() as u64); let msg3_pb = Self::decode_handshake_message::( PacketType::NoiseHandshakeMsg3, Some(&mut hs), @@ -1260,11 +1268,13 @@ impl PeerConn { let rsp = Self::decode_handshake_packet(&first_pkt)?; handshake_recved(self, &rsp.network_name)?; tracing::info!("handshake request: {:?}", rsp); + self.record_control_rx(&rsp.network_name, first_pkt.buf_len() as u64); self.info = Some(rsp); self.is_client = Some(false); let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity(); - self.send_handshake(send_digest).await?; + self.send_handshake(send_digest, &self.get_network_identity().network_name) + .await?; } else { return Err(Error::WaitRespError(format!( "unexpected packet type during handshake: {}", @@ -1296,7 +1306,8 @@ impl PeerConn { self.info = Some(handshake_rsp); self.is_client = Some(true); } else { - self.send_handshake(true).await?; + let network = self.global_ctx.get_network_identity(); + self.send_handshake(true, &network.network_name).await?; tracing::info!("waiting for handshake request from server"); let rsp = self.wait_handshake_loop().await?; tracing::info!("handshake response: {:?}", rsp); @@ -1317,6 +1328,21 @@ impl PeerConn { self.info.is_some() } + fn control_metrics(&self, network_name: &str) -> AggregateTrafficMetrics { + AggregateTrafficMetrics::control( + self.global_ctx.stats_manager().clone(), + network_name.to_string(), + ) + } + + fn record_control_tx(&self, network_name: &str, bytes: u64) { + self.control_metrics(network_name).record_tx(bytes); + } + + fn record_control_rx(&self, network_name: &str, bytes: u64) { + self.control_metrics(network_name).record_rx(bytes); + } + pub async fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) { let mut stream = self.recv.lock().await.take().unwrap(); let sink = self.sink.clone(); @@ -1324,19 +1350,7 @@ impl PeerConn { let close_event_notifier = self.close_event_notifier.clone(); let ctrl_sender = self.ctrl_resp_sender.clone(); let conn_info_for_instrument = self.get_conn_info(); - - let stats_mgr = self.global_ctx.stats_manager(); - let label_set = LabelSet::new().with_label_type(LabelType::NetworkName( - conn_info_for_instrument.network_name.clone(), - )); - let counters = PeerConnCounter { - traffic_tx_bytes: stats_mgr.get_counter(MetricName::TrafficBytesTx, label_set.clone()), - traffic_rx_bytes: stats_mgr.get_counter(MetricName::TrafficBytesRx, label_set.clone()), - traffic_tx_packets: stats_mgr - .get_counter(MetricName::TrafficPacketsTx, label_set.clone()), - traffic_rx_packets: stats_mgr.get_counter(MetricName::TrafficPacketsRx, label_set), - }; - self.counters.store(Some(Arc::new(counters))); + let control_metrics = self.control_metrics(&conn_info_for_instrument.network_name); let is_foreign_network = conn_info_for_instrument.network_name != self.global_ctx.get_network_identity().network_name; @@ -1355,8 +1369,6 @@ impl PeerConn { None }; - let counters = self.counters.load_full().unwrap(); - self.tasks.spawn( async move { tracing::info!("start recving peer conn packet"); @@ -1370,10 +1382,6 @@ impl PeerConn { let mut zc_packet = ret.unwrap(); let buf_len = zc_packet.buf_len() as u64; - - counters.traffic_rx_bytes.add(buf_len); - counters.traffic_rx_packets.inc(); - let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else { tracing::error!( "unexpected packet: {:?}, cannot decode peer manager hdr", @@ -1383,11 +1391,15 @@ impl PeerConn { }; if peer_mgr_hdr.packet_type == PacketType::Ping as u8 { + control_metrics.record_rx(buf_len); peer_mgr_hdr.packet_type = PacketType::Pong as u8; if let Err(e) = sink.send(zc_packet).await { tracing::error!(?e, "peer conn send req error"); + } else { + control_metrics.record_tx(buf_len); } } else if peer_mgr_hdr.packet_type == PacketType::Pong as u8 { + control_metrics.record_rx(buf_len); if let Err(e) = ctrl_sender.send(zc_packet) { tracing::error!(?e, "peer conn send ctrl resp error"); } @@ -1422,6 +1434,7 @@ impl PeerConn { self.latency_stats.clone(), self.loss_rate_stats.clone(), self.throughput.clone(), + self.control_metrics(&self.get_conn_info().network_name), ); let close_event_notifier = self.close_event_notifier.clone(); @@ -1438,11 +1451,6 @@ impl PeerConn { } pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> { - let counters = self.counters.load(); - if let Some(ref counters) = *counters { - counters.traffic_tx_bytes.add(msg.buf_len() as u64); - counters.traffic_tx_packets.inc(); - } Ok(self.sink.send(msg).await?) } @@ -1545,7 +1553,7 @@ impl Drop for PeerConn { #[cfg(test)] pub mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use rand::rngs::OsRng; @@ -1555,8 +1563,10 @@ pub mod tests { use crate::common::global_ctx::GlobalCtx; use crate::common::new_peer_id; use crate::common::scoped_task::ScopedTask; + use crate::common::stats_manager::{LabelSet, LabelType, MetricName}; use crate::peers::create_packet_recv_chan; use crate::peers::recv_packet_from_chan; + use crate::tunnel::common::tests::wait_for_condition; use crate::tunnel::filter::tests::DropSendTunnelFilter; use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::ring::create_ring_tunnel_pair; @@ -1577,6 +1587,17 @@ pub mod tests { } } + fn metric_value(global_ctx: &GlobalCtx, metric: MetricName, network_name: &str) -> u64 { + global_ctx + .stats_manager() + .get_metric( + metric, + &LabelSet::new().with_label_type(LabelType::NetworkName(network_name.to_string())), + ) + .map(|metric| metric.value) + .unwrap_or(0) + } + #[tokio::test] async fn peer_conn_handshake_same_id() { let ps = Arc::new(PeerSessionStore::new()); @@ -1610,10 +1631,12 @@ pub mod tests { let s_peer_id = new_peer_id(); let ps = Arc::new(PeerSessionStore::new()); + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); - let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c), ps.clone()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx.clone(), Box::new(c), ps.clone()); - let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx.clone(), Box::new(s), ps.clone()); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), @@ -1629,6 +1652,47 @@ pub mod tests { assert_eq!(s_recorder.sent.lock().unwrap().len(), 1); assert_eq!(s_recorder.received.lock().unwrap().len(), 1); + assert_eq!( + metric_value(&c_ctx, MetricName::TrafficControlBytesTx, "default"), + c_recorder + .sent + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&c_ctx, MetricName::TrafficControlBytesRx, "default"), + c_recorder + .received + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&s_ctx, MetricName::TrafficControlBytesTx, "default"), + s_recorder + .sent + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&s_ctx, MetricName::TrafficControlBytesRx, "default"), + s_recorder + .received + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!(c_peer.get_peer_id(), s_peer_id); assert_eq!(s_peer.get_peer_id(), c_peer_id); assert_eq!(c_peer.get_network_identity(), s_peer.get_network_identity()); @@ -1673,6 +1737,47 @@ pub mod tests { c_ret.unwrap(); s_ret.unwrap(); + assert_eq!( + metric_value(&c_ctx, MetricName::TrafficControlBytesTx, "default"), + c_recorder + .sent + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&c_ctx, MetricName::TrafficControlBytesRx, "default"), + c_recorder + .received + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&s_ctx, MetricName::TrafficControlBytesTx, "default"), + s_recorder + .sent + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + assert_eq!( + metric_value(&s_ctx, MetricName::TrafficControlBytesRx, "default"), + s_recorder + .received + .lock() + .unwrap() + .iter() + .map(|pkt| pkt.buf_len() as u64) + .sum::() + ); + let c_info = c_peer.get_conn_info(); let s_info = s_peer.get_conn_info(); @@ -2071,6 +2176,47 @@ pub mod tests { } } + #[tokio::test] + async fn peer_conn_pingpong_records_control_metrics() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx.clone(), Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx.clone(), Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + + assert!(c_ret.is_ok()); + assert!(s_ret.is_ok()); + + s_peer.start_recv_loop(create_packet_recv_chan().0).await; + c_peer.start_pingpong(); + c_peer.start_recv_loop(create_packet_recv_chan().0).await; + + wait_for_condition( + || { + let c_ctx = c_ctx.clone(); + let s_ctx = s_ctx.clone(); + async move { + metric_value(&c_ctx, MetricName::TrafficControlBytesTx, "default") > 0 + && metric_value(&c_ctx, MetricName::TrafficControlBytesRx, "default") > 0 + && metric_value(&s_ctx, MetricName::TrafficControlBytesTx, "default") > 0 + && metric_value(&s_ctx, MetricName::TrafficControlBytesRx, "default") > 0 + } + }, + Duration::from_secs(5), + ) + .await; + } + #[tokio::test] async fn peer_conn_pingpong_timeout_not_close() { peer_conn_pingpong_test_common(3, 5, false, false).await; diff --git a/easytier/src/peers/peer_conn_ping.rs b/easytier/src/peers/peer_conn_ping.rs index 2b90fdd7..538d1f2c 100644 --- a/easytier/src/peers/peer_conn_ping.rs +++ b/easytier/src/peers/peer_conn_ping.rs @@ -16,6 +16,7 @@ use tracing::Instrument; use crate::{ common::{error::Error, PeerId}, + peers::traffic_metrics::AggregateTrafficMetrics, tunnel::{ mpsc::MpscTunnelSender, packet_def::{PacketType, ZCPacket}, @@ -118,6 +119,7 @@ pub struct PeerConnPinger { latency_stats: Arc, loss_rate_stats: Arc, throughput_stats: Arc, + control_metrics: AggregateTrafficMetrics, tasks: JoinSet>, } @@ -131,7 +133,8 @@ impl std::fmt::Debug for PeerConnPinger { } impl PeerConnPinger { - pub fn new( + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( my_peer_id: PeerId, peer_id: PeerId, sink: MpscTunnelSender, @@ -139,6 +142,7 @@ impl PeerConnPinger { latency_stats: Arc, loss_rate_stats: Arc, throughput_stats: Arc, + control_metrics: AggregateTrafficMetrics, ) -> Self { Self { my_peer_id, @@ -149,6 +153,7 @@ impl PeerConnPinger { ctrl_sender, loss_rate_stats, throughput_stats, + control_metrics, } } @@ -162,12 +167,15 @@ impl PeerConnPinger { my_node_id: PeerId, peer_id: PeerId, sink: &MpscTunnelSender, + control_metrics: &AggregateTrafficMetrics, receiver: &mut broadcast::Receiver, seq: u32, ) -> Result { // should add seq here. so latency can be calculated more accurately let req = Self::new_ping_packet(my_node_id, peer_id, seq); + let req_len = req.buf_len() as u64; sink.send(req).await?; + control_metrics.record_tx(req_len); let now = std::time::Instant::now(); // wait until we get a pong packet in ctrl_resp_receiver @@ -214,6 +222,7 @@ impl PeerConnPinger { pub async fn pingpong(&mut self) { let sink = self.sink.clone(); + let control_metrics = self.control_metrics.clone(); let my_node_id = self.my_peer_id; let peer_id = self.peer_id; let latency_stats = self.latency_stats.clone(); @@ -259,6 +268,7 @@ impl PeerConnPinger { ); let sink = sink.clone(); + let control_metrics = control_metrics.clone(); let receiver = ctrl_resp_sender.subscribe(); let ping_res_sender = ping_res_sender.clone(); pingpong_tasks.spawn(async move { @@ -267,6 +277,7 @@ impl PeerConnPinger { my_node_id, peer_id, &sink, + &control_metrics, &mut receiver, req_seq, ) diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 426a4e68..7a2c4b74 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -23,7 +23,7 @@ use crate::{ compressor::{Compressor as _, DefaultCompressor}, constants::EASYTIER_VERSION, error::Error, - global_ctx::{ArcGlobalCtx, NetworkIdentity}, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, shrink_dashmap, stats_manager::{CounterHandle, LabelSet, LabelType, MetricName}, stun::StunInfoCollectorTrait, @@ -35,6 +35,10 @@ use crate::{ peer_session::PeerSessionStore, recv_packet_from_chan, route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface}, + traffic_metrics::{ + route_peer_info_instance_id, InstanceLabelKind, LogicalTrafficMetrics, + TrafficMetricRecorder, + }, PeerPacketFilter, }, proto::{ @@ -125,6 +129,15 @@ enum RouteAlgoInst { None, } +impl Clone for RouteAlgoInst { + fn clone(&self) -> Self { + match self { + RouteAlgoInst::Ospf(route) => RouteAlgoInst::Ospf(route.clone()), + RouteAlgoInst::None => RouteAlgoInst::None, + } + } +} + struct SelfTxCounters { self_tx_packets: CounterHandle, self_tx_bytes: CounterHandle, @@ -168,6 +181,7 @@ pub struct PeerManager { allow_loopback_tunnel: AtomicBool, self_tx_counters: SelfTxCounters, + traffic_metrics: Arc, peer_session_store: Arc, is_secure_mode_enabled: bool, @@ -300,14 +314,6 @@ impl PeerManager { my_peer_id, )); - let relay_peer_map = RelayPeerMap::new( - peers.clone(), - Some(foreign_network_client.clone()), - global_ctx.clone(), - my_peer_id, - peer_session_store.clone(), - ); - let data_compress_algo = global_ctx .get_flags() .data_compress_algo() @@ -317,28 +323,89 @@ impl PeerManager { let exit_nodes = global_ctx.config.get_exit_nodes(); let stats_manager = global_ctx.stats_manager(); + let network_name = global_ctx.get_network_name(); + let traffic_tx_metrics = Arc::new(LogicalTrafficMetrics::new( + stats_manager.clone(), + network_name.clone(), + MetricName::TrafficBytesTx, + MetricName::TrafficPacketsTx, + MetricName::TrafficBytesTxByInstance, + MetricName::TrafficPacketsTxByInstance, + InstanceLabelKind::To, + )); + let traffic_control_tx_metrics = Arc::new(LogicalTrafficMetrics::new( + stats_manager.clone(), + network_name.clone(), + MetricName::TrafficControlBytesTx, + MetricName::TrafficControlPacketsTx, + MetricName::TrafficControlBytesTxByInstance, + MetricName::TrafficControlPacketsTxByInstance, + InstanceLabelKind::To, + )); + let relay_peer_map = RelayPeerMap::new( + peers.clone(), + Some(foreign_network_client.clone()), + global_ctx.clone(), + my_peer_id, + peer_session_store.clone(), + ); let self_tx_counters = SelfTxCounters { self_tx_packets: stats_manager.get_counter( MetricName::TrafficPacketsSelfTx, - LabelSet::new() - .with_label_type(LabelType::NetworkName(global_ctx.get_network_name())), + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), self_tx_bytes: stats_manager.get_counter( MetricName::TrafficBytesSelfTx, - LabelSet::new() - .with_label_type(LabelType::NetworkName(global_ctx.get_network_name())), + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), compress_tx_bytes_before: stats_manager.get_counter( MetricName::CompressionBytesTxBefore, - LabelSet::new() - .with_label_type(LabelType::NetworkName(global_ctx.get_network_name())), + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), compress_tx_bytes_after: stats_manager.get_counter( MetricName::CompressionBytesTxAfter, - LabelSet::new() - .with_label_type(LabelType::NetworkName(global_ctx.get_network_name())), + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())), ), }; + let traffic_rx_metrics = Arc::new(LogicalTrafficMetrics::new( + stats_manager.clone(), + network_name, + MetricName::TrafficBytesRx, + MetricName::TrafficPacketsRx, + MetricName::TrafficBytesRxByInstance, + MetricName::TrafficPacketsRxByInstance, + InstanceLabelKind::From, + )); + let traffic_control_rx_metrics = Arc::new(LogicalTrafficMetrics::new( + stats_manager.clone(), + global_ctx.get_network_name(), + MetricName::TrafficControlBytesRx, + MetricName::TrafficControlPacketsRx, + MetricName::TrafficControlBytesRxByInstance, + MetricName::TrafficControlPacketsRxByInstance, + InstanceLabelKind::From, + )); + let route_algo_inst_for_metrics = route_algo_inst.clone(); + let traffic_metrics = Arc::new(TrafficMetricRecorder::new( + my_peer_id, + traffic_tx_metrics, + traffic_control_tx_metrics, + traffic_rx_metrics, + traffic_control_rx_metrics, + move |peer_id| { + let route_algo_inst = route_algo_inst_for_metrics.clone(); + async move { + match &route_algo_inst { + RouteAlgoInst::Ospf(route) => route + .get_peer_info(peer_id) + .await + .as_ref() + .and_then(route_peer_info_instance_id), + RouteAlgoInst::None => None, + } + } + }, + )); PeerManager { my_peer_id, @@ -376,6 +443,7 @@ impl PeerManager { allow_loopback_tunnel: AtomicBool::new(true), self_tx_counters, + traffic_metrics, peer_session_store, is_secure_mode_enabled, @@ -818,6 +886,7 @@ impl PeerManager { stats_mgr.get_counter(MetricName::CompressionBytesRxBefore, label_set.clone()); let compress_rx_bytes_after = stats_mgr.get_counter(MetricName::CompressionBytesRxAfter, label_set.clone()); + let traffic_metrics = self.traffic_metrics.clone(); self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); @@ -838,6 +907,8 @@ impl PeerManager { tracing::trace!(?hdr, "peer recv a packet..."); let from_peer_id = hdr.from_peer_id.get(); let to_peer_id = hdr.to_peer_id.get(); + let packet_type = hdr.packet_type; + let is_encrypted = hdr.is_encrypted(); if to_peer_id != my_peer_id { if hdr.forward_counter > 7 { tracing::warn!(?hdr, "forward counter exceed, drop packet"); @@ -846,10 +917,10 @@ impl PeerManager { // Step 10b: credential nodes don't forward handshake packets if is_credential_node - && (hdr.packet_type == PacketType::HandShake as u8 - || hdr.packet_type == PacketType::NoiseHandshakeMsg1 as u8 - || hdr.packet_type == PacketType::NoiseHandshakeMsg2 as u8 - || hdr.packet_type == PacketType::NoiseHandshakeMsg3 as u8) + && (packet_type == PacketType::HandShake as u8 + || packet_type == PacketType::NoiseHandshakeMsg1 as u8 + || packet_type == PacketType::NoiseHandshakeMsg2 as u8 + || packet_type == PacketType::NoiseHandshakeMsg3 as u8) { tracing::debug!("credential node dropping forwarded handshake packet"); continue; @@ -865,9 +936,9 @@ impl PeerManager { if from_peer_id == my_peer_id { compress_tx_bytes_before.add(buf_len as u64); - if hdr.packet_type == PacketType::Data as u8 - || hdr.packet_type == PacketType::KcpSrc as u8 - || hdr.packet_type == PacketType::KcpDst as u8 + if packet_type == PacketType::Data as u8 + || packet_type == PacketType::KcpSrc as u8 + || packet_type == PacketType::KcpDst as u8 { let _ = Self::try_compress_and_encrypt( compress_algo, @@ -887,10 +958,16 @@ impl PeerManager { } tracing::trace!(?to_peer_id, ?my_peer_id, "need forward"); + let tx_metrics = if from_peer_id == my_peer_id { + Some(&traffic_metrics) + } else { + None + }; let ret = Self::send_msg_internal( &peers, &foreign_client, &relay_peer_map, + tx_metrics, ret, to_peer_id, ) @@ -899,8 +976,8 @@ impl PeerManager { tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error"); } } else { - if hdr.packet_type == PacketType::RelayHandshake as u8 - || hdr.packet_type == PacketType::RelayHandshakeAck as u8 + if packet_type == PacketType::RelayHandshake as u8 + || packet_type == PacketType::RelayHandshakeAck as u8 { let _ = relay_peer_map.handle_handshake_packet(ret).await; continue; @@ -910,7 +987,7 @@ impl PeerManager { tracing::error!(?e, "decrypt failed"); continue; } - } else if hdr.is_encrypted() { + } else if is_encrypted { match relay_peer_map.decrypt_if_needed(&mut ret).await { Ok(true) => {} Ok(false) => { @@ -926,6 +1003,9 @@ impl PeerManager { self_rx_bytes.add(buf_len as u64); self_rx_packets.inc(); + traffic_metrics + .record_rx(from_peer_id, packet_type, buf_len as u64) + .await; compress_rx_bytes_before.add(buf_len as u64); let compressor = DefaultCompressor {}; @@ -1267,6 +1347,7 @@ impl PeerManager { &self.peers, &self.foreign_network_client, &self.relay_peer_map, + Some(&self.traffic_metrics), msg, dst_peer_id, ) @@ -1282,19 +1363,19 @@ impl PeerManager { peers: &Arc, foreign_network_client: &Arc, relay_peer_map: &Arc, + direct_tx_metrics: Option<&Arc>, msg: ZCPacket, dst_peer_id: PeerId, ) -> Result<(), Error> { let policy = Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first()); - - if peers.has_peer(dst_peer_id) { - return peers.send_msg_directly(msg, dst_peer_id).await; + let packet_type = msg.peer_manager_header().unwrap().packet_type; + let msg_len = msg.buf_len() as u64; + let send_result = if peers.has_peer(dst_peer_id) { + peers.send_msg_directly(msg, dst_peer_id).await } else if foreign_network_client.has_next_hop(dst_peer_id) { - return foreign_network_client.send_msg(msg, dst_peer_id).await; - } - - if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await { + foreign_network_client.send_msg(msg, dst_peer_id).await + } else if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await { if peers.has_peer(gateway) || foreign_network_client.has_next_hop(gateway) { relay_peer_map.send_msg(msg, dst_peer_id, policy).await } else { @@ -1311,7 +1392,15 @@ impl PeerManager { } else { tracing::debug!(?dst_peer_id, "no gateway for peer"); Err(Error::RouteError(None)) + }; + + if send_result.is_ok() { + if let Some(metrics) = direct_tx_metrics { + metrics.record_tx(dst_peer_id, packet_type, msg_len).await; + } } + + send_result } pub async fn get_msg_dst_peer(&self, addr: &IpAddr) -> (Vec, bool) { @@ -1454,6 +1543,7 @@ impl PeerManager { &self.peers, &self.foreign_network_client, &self.relay_peer_map, + Some(&self.traffic_metrics), msg, cur_to_peer_id, ) @@ -1533,6 +1623,7 @@ impl PeerManager { &self.peers, &self.foreign_network_client, &self.relay_peer_map, + Some(&self.traffic_metrics), msg, *peer_id, ) @@ -1604,6 +1695,30 @@ impl PeerManager { }); } + async fn run_traffic_metrics_gc_routine(&self) { + let mut event_receiver = self.global_ctx.subscribe(); + let traffic_metrics = self.traffic_metrics.clone(); + self.tasks.lock().await.spawn(async move { + loop { + match event_receiver.recv().await { + Ok(GlobalCtxEvent::PeerRemoved(peer_id)) => { + traffic_metrics.remove_peer(peer_id); + } + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + tracing::warn!( + skipped, + "traffic metrics GC receiver lagged; clearing peer cache to avoid stale metric attribution" + ); + traffic_metrics.clear_peer_cache(); + event_receiver = event_receiver.resubscribe(); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + } + async fn run_foriegn_network(&self) { self.peer_rpc_tspt .foreign_peers @@ -1628,6 +1743,7 @@ impl PeerManager { self.run_relay_session_gc_routine().await; self.run_recent_traffic_gc_routine().await; self.run_peer_session_gc_routine().await; + self.run_traffic_metrics_gc_routine().await; self.run_foriegn_network().await; @@ -1867,6 +1983,7 @@ mod tests { common::{ config::Flags, global_ctx::{tests::get_mock_global_ctx, NetworkIdentity}, + stats_manager::{LabelSet, LabelType, MetricName}, }, connector::{ create_connector_by_url, direct::PeerManagerForDirectConnector, @@ -1891,6 +2008,7 @@ mod tests { tunnel::{ common::tests::wait_for_condition, filter::{tests::DropSendTunnelFilter, TunnelWithFilter}, + packet_def::{PacketType, ZCPacket}, ring::create_ring_tunnel_pair, TunnelConnector, TunnelListener, }, @@ -1906,6 +2024,15 @@ mod tests { peer_mgr } + fn metric_value(peer_mgr: &PeerManager, metric: MetricName, labels: &LabelSet) -> u64 { + peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric(metric, labels) + .map(|metric| metric.value) + .unwrap_or(0) + } + #[test] fn recent_traffic_fanout_policy_only_marks_single_peer() { assert!(PeerManager::should_mark_recent_traffic_for_fanout(0)); @@ -2013,6 +2140,277 @@ mod tests { assert_eq!(signal.version(), initial_version + 2); } + #[tokio::test] + async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() { + let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let dst_peer_id = peer_mgr.my_peer_id().wrapping_add(1); + let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr.get_global_ctx().get_network_name(), + )); + + let mut pkt = ZCPacket::new_with_payload(b"tx"); + pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8); + + let result = PeerManager::send_msg_internal( + &peer_mgr.peers, + &peer_mgr.foreign_network_client, + &peer_mgr.relay_peer_map, + Some(&peer_mgr.traffic_metrics), + pkt, + dst_peer_id, + ) + .await; + + assert!(result.is_err()); + assert_eq!( + peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesTx, &network_labels) + .unwrap() + .value, + 0 + ); + assert_eq!( + peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsTx, &network_labels) + .unwrap() + .value, + 0 + ); + assert!(peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric( + MetricName::TrafficBytesTxByInstance, + &network_labels + .clone() + .with_label_type(LabelType::ToInstanceId("unknown".to_string())), + ) + .is_none()); + assert!(peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric( + MetricName::TrafficPacketsTxByInstance, + &network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())), + ) + .is_none()); + } + + #[tokio::test] + async fn send_msg_internal_does_not_record_tx_metrics_for_self_loop() { + let (s, _r) = create_packet_recv_chan(); + let peer_mgr = Arc::new(PeerManager::new( + RouteAlgoType::None, + get_mock_global_ctx(), + s, + )); + let dst_peer_id = peer_mgr.my_peer_id(); + let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr.get_global_ctx().get_network_name(), + )); + + let mut pkt = ZCPacket::new_with_payload(b"tx"); + pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8); + + PeerManager::send_msg_internal( + &peer_mgr.peers, + &peer_mgr.foreign_network_client, + &peer_mgr.relay_peer_map, + Some(&peer_mgr.traffic_metrics), + pkt, + dst_peer_id, + ) + .await + .unwrap(); + + assert_eq!( + metric_value(&peer_mgr, MetricName::TrafficBytesTx, &network_labels), + 0 + ); + assert_eq!( + metric_value(&peer_mgr, MetricName::TrafficPacketsTx, &network_labels), + 0 + ); + assert_eq!( + metric_value( + &peer_mgr, + MetricName::TrafficControlBytesTx, + &network_labels + ), + 0 + ); + assert_eq!( + metric_value( + &peer_mgr, + MetricName::TrafficControlPacketsTx, + &network_labels + ), + 0 + ); + assert!(peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric( + MetricName::TrafficBytesTxByInstance, + &network_labels + .clone() + .with_label_type(LabelType::ToInstanceId("unknown".to_string())), + ) + .is_none()); + assert!(peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric( + MetricName::TrafficControlBytesTxByInstance, + &network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())), + ) + .is_none()); + } + + #[tokio::test] + async fn send_msg_internal_records_data_metrics_for_direct_peer() { + let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) + .await + .unwrap(); + + let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr_a.get_global_ctx().get_network_name(), + )); + let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr_b.get_global_ctx().get_network_name(), + )); + + let a_data_tx_before = + metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels); + let b_data_rx_before = + metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels); + let mut pkt = ZCPacket::new_with_payload(b"data"); + pkt.fill_peer_manager_hdr( + peer_mgr_a.my_peer_id(), + peer_mgr_b.my_peer_id(), + PacketType::Data as u8, + ); + let pkt_len = pkt.buf_len() as u64; + + PeerManager::send_msg_internal( + &peer_mgr_a.peers, + &peer_mgr_a.foreign_network_client, + &peer_mgr_a.relay_peer_map, + Some(&peer_mgr_a.traffic_metrics), + pkt, + peer_mgr_b.my_peer_id(), + ) + .await + .unwrap(); + + wait_for_condition( + || { + let peer_mgr_a = peer_mgr_a.clone(); + let peer_mgr_b = peer_mgr_b.clone(); + let a_network_labels = a_network_labels.clone(); + let b_network_labels = b_network_labels.clone(); + async move { + metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels) + >= a_data_tx_before + pkt_len + && metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels) + >= b_data_rx_before + pkt_len + } + }, + Duration::from_secs(5), + ) + .await; + } + + #[tokio::test] + async fn send_msg_internal_records_control_metrics_for_direct_peer() { + let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) + .await + .unwrap(); + + let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr_a.get_global_ctx().get_network_name(), + )); + let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName( + peer_mgr_b.get_global_ctx().get_network_name(), + )); + + let a_control_tx_before = metric_value( + &peer_mgr_a, + MetricName::TrafficControlBytesTx, + &a_network_labels, + ); + let b_control_rx_before = metric_value( + &peer_mgr_b, + MetricName::TrafficControlBytesRx, + &b_network_labels, + ); + let a_data_tx_before = + metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels); + let b_data_rx_before = + metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels); + + let mut pkt = ZCPacket::new_with_payload(b"ctrl"); + pkt.fill_peer_manager_hdr( + peer_mgr_a.my_peer_id(), + peer_mgr_b.my_peer_id(), + PacketType::RpcReq as u8, + ); + let pkt_len = pkt.buf_len() as u64; + + PeerManager::send_msg_internal( + &peer_mgr_a.peers, + &peer_mgr_a.foreign_network_client, + &peer_mgr_a.relay_peer_map, + Some(&peer_mgr_a.traffic_metrics), + pkt, + peer_mgr_b.my_peer_id(), + ) + .await + .unwrap(); + + wait_for_condition( + || { + let peer_mgr_a = peer_mgr_a.clone(); + let peer_mgr_b = peer_mgr_b.clone(); + let a_network_labels = a_network_labels.clone(); + let b_network_labels = b_network_labels.clone(); + async move { + metric_value( + &peer_mgr_a, + MetricName::TrafficControlBytesTx, + &a_network_labels, + ) >= a_control_tx_before + pkt_len + && metric_value( + &peer_mgr_b, + MetricName::TrafficControlBytesRx, + &b_network_labels, + ) >= b_control_rx_before + pkt_len + } + }, + Duration::from_secs(5), + ) + .await; + + assert_eq!( + metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels), + a_data_tx_before + ); + assert_eq!( + metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels), + b_data_rx_before + ); + } + #[tokio::test] async fn recent_traffic_tolerates_future_timestamps() { let peer_mgr_a = create_lazy_peer_manager().await; diff --git a/easytier/src/peers/relay_peer_map.rs b/easytier/src/peers/relay_peer_map.rs index bc8e377a..55a40e29 100644 --- a/easytier/src/peers/relay_peer_map.rs +++ b/easytier/src/peers/relay_peer_map.rs @@ -13,6 +13,7 @@ use crate::{ peers::peer_map::PeerMap, peers::peer_session::{PeerSession, PeerSessionAction, PeerSessionStore, SessionKey}, peers::route_trait::NextHopPolicy, + peers::traffic_metrics::AggregateTrafficMetrics, proto::peer_rpc::{PeerConnSessionActionPb, RelayNoiseMsg1Pb, RelayNoiseMsg2Pb}, tunnel::packet_def::{PacketType, ZCPacket}, }; @@ -53,6 +54,7 @@ pub struct RelayPeerMap { pub(crate) pending_packets: DashMap>, is_secure_mode_enabled: bool, + control_metrics: AggregateTrafficMetrics, } impl RelayPeerMap { @@ -69,6 +71,10 @@ impl RelayPeerMap { .map(|cfg| cfg.enabled) .unwrap_or(false); Arc::new(Self { + control_metrics: AggregateTrafficMetrics::control( + global_ctx.stats_manager().clone(), + global_ctx.get_network_name(), + ), peer_map, foreign_network_client, global_ctx, @@ -131,7 +137,10 @@ impl RelayPeerMap { ) -> Result<(), Error> { let mut pkt = ZCPacket::new_with_payload(&payload); pkt.fill_peer_manager_hdr(self.my_peer_id, dst_peer_id, packet_type as u8); - self.send_via_next_hop(pkt, dst_peer_id, policy).await + let pkt_len = pkt.buf_len() as u64; + self.send_via_next_hop(pkt, dst_peer_id, policy).await?; + self.control_metrics.record_tx(pkt_len); + Ok(()) } async fn send_via_next_hop( @@ -467,6 +476,7 @@ impl RelayPeerMap { .peer_manager_header() .ok_or_else(|| Error::RouteError(Some("packet without header".to_string())))?; let src_peer_id = hdr.from_peer_id.get(); + self.control_metrics.record_rx(packet.buf_len() as u64); match hdr.packet_type { x if x == PacketType::RelayHandshake as u8 => { tracing::debug!("handle_relay_msg1 from {:?}", src_peer_id); diff --git a/easytier/src/peers/tests.rs b/easytier/src/peers/tests.rs index 81d303aa..628aebac 100644 --- a/easytier/src/peers/tests.rs +++ b/easytier/src/peers/tests.rs @@ -10,6 +10,7 @@ use crate::{ tests::{get_mock_global_ctx, get_mock_global_ctx_with_network}, NetworkIdentity, }, + stats_manager::{LabelSet, LabelType, MetricName}, PeerId, }, tunnel::{ @@ -101,6 +102,18 @@ pub async fn wait_route_appear( wait_route_appear_with_cost(target_peer, peer_mgr.my_peer_id(), None).await } +fn metric_value(peer_mgr: &PeerManager, metric: MetricName, network_name: &str) -> u64 { + peer_mgr + .get_global_ctx() + .stats_manager() + .get_metric( + metric, + &LabelSet::new().with_label_type(LabelType::NetworkName(network_name.to_string())), + ) + .map(|metric| metric.value) + .unwrap_or(0) +} + #[tokio::test] async fn foreign_mgr_stress_test() { const FOREIGN_NETWORK_COUNT: i32 = 20; @@ -392,6 +405,10 @@ async fn relay_peer_map_real_link_handshake_success() { let peer_a_id = peer_a.my_peer_id(); let peer_b_id = peer_b.my_peer_id(); let peer_c_id = peer_c.my_peer_id(); + let a_control_tx_before = metric_value(&peer_a, MetricName::TrafficControlBytesTx, "net1"); + let a_control_rx_before = metric_value(&peer_a, MetricName::TrafficControlBytesRx, "net1"); + let c_control_tx_before = metric_value(&peer_c, MetricName::TrafficControlBytesTx, "net1"); + let c_control_rx_before = metric_value(&peer_c, MetricName::TrafficControlBytesRx, "net1"); wait_for_condition( || { @@ -459,6 +476,11 @@ async fn relay_peer_map_real_link_handshake_success() { Duration::from_secs(5), ) .await; + + assert!(metric_value(&peer_a, MetricName::TrafficControlBytesTx, "net1") > a_control_tx_before); + assert!(metric_value(&peer_a, MetricName::TrafficControlBytesRx, "net1") > a_control_rx_before); + assert!(metric_value(&peer_c, MetricName::TrafficControlBytesTx, "net1") > c_control_tx_before); + assert!(metric_value(&peer_c, MetricName::TrafficControlBytesRx, "net1") > c_control_rx_before); } #[tokio::test] diff --git a/easytier/src/peers/traffic_metrics.rs b/easytier/src/peers/traffic_metrics.rs new file mode 100644 index 00000000..4812e4cf --- /dev/null +++ b/easytier/src/peers/traffic_metrics.rs @@ -0,0 +1,527 @@ +use std::{future::Future, sync::Arc}; + +use dashmap::DashMap; +use futures::future::BoxFuture; + +use crate::common::{ + shrink_dashmap, + stats_manager::{CounterHandle, LabelSet, LabelType, MetricName, StatsManager}, + PeerId, +}; +use crate::proto::peer_rpc::RoutePeerInfo; +use crate::tunnel::packet_def::PacketType; + +pub(crate) const UNKNOWN_INSTANCE_ID: &str = "unknown"; + +#[derive(Clone, Copy)] +pub(crate) enum InstanceLabelKind { + To, + From, +} + +#[derive(Clone)] +struct TrafficCounters { + bytes: CounterHandle, + packets: CounterHandle, +} + +impl TrafficCounters { + fn add_sample(&self, bytes: u64) { + self.bytes.add(bytes); + self.packets.inc(); + } +} + +#[derive(Clone)] +pub(crate) struct AggregateTrafficMetrics { + tx: TrafficCounters, + rx: TrafficCounters, +} + +impl AggregateTrafficMetrics { + pub(crate) fn control(stats_mgr: Arc, network_name: String) -> Self { + Self::new( + stats_mgr, + network_name, + MetricName::TrafficControlBytesTx, + MetricName::TrafficControlPacketsTx, + MetricName::TrafficControlBytesRx, + MetricName::TrafficControlPacketsRx, + ) + } + + fn new( + stats_mgr: Arc, + network_name: String, + tx_bytes_metric: MetricName, + tx_packets_metric: MetricName, + rx_bytes_metric: MetricName, + rx_packets_metric: MetricName, + ) -> Self { + let label_set = + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())); + Self { + tx: TrafficCounters { + bytes: stats_mgr.get_counter(tx_bytes_metric, label_set.clone()), + packets: stats_mgr.get_counter(tx_packets_metric, label_set.clone()), + }, + rx: TrafficCounters { + bytes: stats_mgr.get_counter(rx_bytes_metric, label_set.clone()), + packets: stats_mgr.get_counter(rx_packets_metric, label_set), + }, + } + } + + pub(crate) fn record_tx(&self, bytes: u64) { + self.tx.add_sample(bytes); + } + + pub(crate) fn record_rx(&self, bytes: u64) { + self.rx.add_sample(bytes); + } +} + +#[derive(Clone)] +enum CachedPeerTrafficCounters { + Unknown(TrafficCounters), + Resolved(TrafficCounters), +} + +impl CachedPeerTrafficCounters { + fn counters(&self) -> TrafficCounters { + match self { + CachedPeerTrafficCounters::Unknown(counters) + | CachedPeerTrafficCounters::Resolved(counters) => counters.clone(), + } + } + + fn is_resolved(&self) -> bool { + matches!(self, CachedPeerTrafficCounters::Resolved(_)) + } +} + +pub(crate) struct LogicalTrafficMetrics { + stats_mgr: Arc, + network_name: String, + instance_bytes_metric: MetricName, + instance_packets_metric: MetricName, + label_kind: InstanceLabelKind, + total: TrafficCounters, + per_peer: DashMap, +} + +impl LogicalTrafficMetrics { + pub(crate) fn new( + stats_mgr: Arc, + network_name: String, + total_bytes_metric: MetricName, + total_packets_metric: MetricName, + instance_bytes_metric: MetricName, + instance_packets_metric: MetricName, + label_kind: InstanceLabelKind, + ) -> Self { + let label_set = + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())); + Self { + total: TrafficCounters { + bytes: stats_mgr.get_counter(total_bytes_metric, label_set.clone()), + packets: stats_mgr.get_counter(total_packets_metric, label_set), + }, + stats_mgr, + network_name, + instance_bytes_metric, + instance_packets_metric, + label_kind, + per_peer: DashMap::new(), + } + } + + pub(crate) async fn record_with_resolver( + &self, + peer_id: PeerId, + bytes: u64, + resolver: F, + ) where + F: FnOnce() -> Fut, + Fut: Future>, + { + self.total.add_sample(bytes); + + if let Some(entry) = self.per_peer.get(&peer_id) { + if entry.value().is_resolved() { + entry.value().counters().add_sample(bytes); + return; + } + } + + let resolved_instance_id = resolver().await; + let counters = self.get_or_update_peer_counters(peer_id, resolved_instance_id.as_deref()); + counters.add_sample(bytes); + } + + fn get_or_update_peer_counters( + &self, + peer_id: PeerId, + resolved_instance_id: Option<&str>, + ) -> TrafficCounters { + match self.per_peer.entry(peer_id) { + dashmap::Entry::Occupied(mut entry) => { + if entry.get().is_resolved() || resolved_instance_id.is_none() { + return entry.get().counters(); + } + let counters = self.build_peer_counters(resolved_instance_id.unwrap()); + entry.insert(CachedPeerTrafficCounters::Resolved(counters.clone())); + counters + } + dashmap::Entry::Vacant(entry) => { + let counters = + self.build_peer_counters(resolved_instance_id.unwrap_or(UNKNOWN_INSTANCE_ID)); + let cached = if resolved_instance_id.is_some() { + CachedPeerTrafficCounters::Resolved(counters.clone()) + } else { + CachedPeerTrafficCounters::Unknown(counters.clone()) + }; + entry.insert(cached); + counters + } + } + } + + pub(crate) fn remove_peer(&self, peer_id: PeerId) { + self.per_peer.remove(&peer_id); + shrink_dashmap(&self.per_peer, None); + } + + pub(crate) fn clear_peer_cache(&self) { + self.per_peer.clear(); + shrink_dashmap(&self.per_peer, None); + } + + #[cfg(test)] + fn peer_cache_size(&self) -> usize { + self.per_peer.len() + } + + fn build_peer_counters(&self, instance_id: &str) -> TrafficCounters { + let instance_label = match self.label_kind { + InstanceLabelKind::To => LabelType::ToInstanceId(instance_id.to_string()), + InstanceLabelKind::From => LabelType::FromInstanceId(instance_id.to_string()), + }; + let label_set = LabelSet::new() + .with_label_type(LabelType::NetworkName(self.network_name.clone())) + .with_label_type(instance_label); + TrafficCounters { + bytes: self + .stats_mgr + .get_counter(self.instance_bytes_metric, label_set.clone()), + packets: self + .stats_mgr + .get_counter(self.instance_packets_metric, label_set), + } + } +} + +#[derive(Clone, Copy)] +enum TrafficKind { + Data, + Control, +} + +#[derive(Clone)] +struct TrafficMetricGroup { + data: Arc, + control: Arc, +} + +impl TrafficMetricGroup { + fn select(&self, kind: TrafficKind) -> &Arc { + match kind { + TrafficKind::Data => &self.data, + TrafficKind::Control => &self.control, + } + } +} + +type InstanceIdResolver = dyn Fn(PeerId) -> BoxFuture<'static, Option> + Send + Sync; + +pub(crate) struct TrafficMetricRecorder { + my_peer_id: PeerId, + tx_metrics: TrafficMetricGroup, + rx_metrics: TrafficMetricGroup, + resolve_instance_id: Arc, +} + +impl TrafficMetricRecorder { + pub(crate) fn new( + my_peer_id: PeerId, + tx_data: Arc, + tx_control: Arc, + rx_data: Arc, + rx_control: Arc, + resolve_instance_id: F, + ) -> Self + where + F: Fn(PeerId) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + Self { + my_peer_id, + tx_metrics: TrafficMetricGroup { + data: tx_data, + control: tx_control, + }, + rx_metrics: TrafficMetricGroup { + data: rx_data, + control: rx_control, + }, + resolve_instance_id: Arc::new(move |peer_id| Box::pin(resolve_instance_id(peer_id))), + } + } + + pub(crate) async fn record_tx(&self, peer_id: PeerId, packet_type: u8, bytes: u64) { + if peer_id == self.my_peer_id { + return; + } + self.tx_metrics + .select(Self::traffic_kind(packet_type)) + .record_with_resolver(peer_id, bytes, || self.resolve_instance_id(peer_id)) + .await; + } + + pub(crate) async fn record_rx(&self, peer_id: PeerId, packet_type: u8, bytes: u64) { + if peer_id == self.my_peer_id { + return; + } + self.rx_metrics + .select(Self::traffic_kind(packet_type)) + .record_with_resolver(peer_id, bytes, || self.resolve_instance_id(peer_id)) + .await; + } + + pub(crate) fn remove_peer(&self, peer_id: PeerId) { + self.tx_metrics.data.remove_peer(peer_id); + self.tx_metrics.control.remove_peer(peer_id); + self.rx_metrics.data.remove_peer(peer_id); + self.rx_metrics.control.remove_peer(peer_id); + } + + pub(crate) fn clear_peer_cache(&self) { + self.tx_metrics.data.clear_peer_cache(); + self.tx_metrics.control.clear_peer_cache(); + self.rx_metrics.data.clear_peer_cache(); + self.rx_metrics.control.clear_peer_cache(); + } + + fn resolve_instance_id(&self, peer_id: PeerId) -> BoxFuture<'static, Option> { + (self.resolve_instance_id)(peer_id) + } + + fn traffic_kind(packet_type: u8) -> TrafficKind { + if packet_type == PacketType::Data as u8 + || packet_type == PacketType::KcpSrc as u8 + || packet_type == PacketType::KcpDst as u8 + || packet_type == PacketType::QuicSrc as u8 + || packet_type == PacketType::QuicDst as u8 + || packet_type == PacketType::DataWithKcpSrcModified as u8 + || packet_type == PacketType::DataWithQuicSrcModified as u8 + { + TrafficKind::Data + } else { + TrafficKind::Control + } + } +} + +pub(crate) fn route_peer_info_instance_id(route_peer_info: &RoutePeerInfo) -> Option { + let instance_id = route_peer_info.inst_id.as_ref()?; + let instance_id: uuid::Uuid = (*instance_id).into(); + if instance_id.is_nil() { + None + } else { + Some(instance_id.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::stats_manager::LabelSet; + + fn network_labels(network_name: &str) -> LabelSet { + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.to_string())) + } + + fn to_instance_labels(network_name: &str, instance_id: &str) -> LabelSet { + LabelSet::new() + .with_label_type(LabelType::NetworkName(network_name.to_string())) + .with_label_type(LabelType::ToInstanceId(instance_id.to_string())) + } + + #[tokio::test] + async fn logical_traffic_metrics_upgrade_unknown_instance_label() { + let stats_mgr = Arc::new(StatsManager::new()); + let metrics = LogicalTrafficMetrics::new( + stats_mgr.clone(), + "default".to_string(), + MetricName::TrafficBytesTx, + MetricName::TrafficPacketsTx, + MetricName::TrafficBytesTxByInstance, + MetricName::TrafficPacketsTxByInstance, + InstanceLabelKind::To, + ); + let peer_id = 42; + let resolved_instance_id = "87ede5a2-9c3d-492d-9bbe-989b9d07e742"; + + metrics + .record_with_resolver(peer_id, 100, || async { None }) + .await; + metrics + .record_with_resolver(peer_id, 200, || async { + Some(resolved_instance_id.to_string()) + }) + .await; + + assert_eq!( + stats_mgr + .get_metric(MetricName::TrafficBytesTx, &network_labels("default")) + .unwrap() + .value, + 300 + ); + assert!(stats_mgr + .get_metric( + MetricName::TrafficBytesTx, + &to_instance_labels("default", UNKNOWN_INSTANCE_ID), + ) + .is_none()); + assert!(stats_mgr + .get_metric( + MetricName::TrafficBytesTx, + &to_instance_labels("default", resolved_instance_id), + ) + .is_none()); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficBytesTxByInstance, + &to_instance_labels("default", UNKNOWN_INSTANCE_ID), + ) + .unwrap() + .value, + 100 + ); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficBytesTxByInstance, + &to_instance_labels("default", resolved_instance_id), + ) + .unwrap() + .value, + 200 + ); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficPacketsTxByInstance, + &to_instance_labels("default", UNKNOWN_INSTANCE_ID), + ) + .unwrap() + .value, + 1 + ); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficPacketsTxByInstance, + &to_instance_labels("default", resolved_instance_id), + ) + .unwrap() + .value, + 1 + ); + } + + #[tokio::test] + async fn logical_traffic_metrics_remove_peer_clears_cached_counters() { + let stats_mgr = Arc::new(StatsManager::new()); + let metrics = LogicalTrafficMetrics::new( + stats_mgr.clone(), + "default".to_string(), + MetricName::TrafficBytesTx, + MetricName::TrafficPacketsTx, + MetricName::TrafficBytesTxByInstance, + MetricName::TrafficPacketsTxByInstance, + InstanceLabelKind::To, + ); + let peer_id = 42; + let resolved_instance_id = "87ede5a2-9c3d-492d-9bbe-989b9d07e742"; + + metrics + .record_with_resolver(peer_id, 100, || async { + Some(resolved_instance_id.to_string()) + }) + .await; + metrics.remove_peer(peer_id); + metrics + .record_with_resolver(peer_id, 200, || async { None }) + .await; + + assert_eq!( + stats_mgr + .get_metric(MetricName::TrafficBytesTx, &network_labels("default")) + .unwrap() + .value, + 300 + ); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficBytesTxByInstance, + &to_instance_labels("default", resolved_instance_id), + ) + .unwrap() + .value, + 100 + ); + assert_eq!( + stats_mgr + .get_metric( + MetricName::TrafficBytesTxByInstance, + &to_instance_labels("default", UNKNOWN_INSTANCE_ID), + ) + .unwrap() + .value, + 200 + ); + } + + #[tokio::test] + async fn logical_traffic_metrics_clear_peer_cache_resets_all_cached_peers() { + let stats_mgr = Arc::new(StatsManager::new()); + let metrics = LogicalTrafficMetrics::new( + stats_mgr, + "default".to_string(), + MetricName::TrafficBytesTx, + MetricName::TrafficPacketsTx, + MetricName::TrafficBytesTxByInstance, + MetricName::TrafficPacketsTxByInstance, + InstanceLabelKind::To, + ); + + metrics + .record_with_resolver(1, 100, || async { + Some("87ede5a2-9c3d-492d-9bbe-989b9d07e742".to_string()) + }) + .await; + metrics + .record_with_resolver(2, 200, || async { None }) + .await; + + assert_eq!(metrics.peer_cache_size(), 2); + + metrics.clear_peer_cache(); + + assert_eq!(metrics.peer_cache_size(), 0); + } +} diff --git a/easytier/src/proto/rpc_impl/client.rs b/easytier/src/proto/rpc_impl/client.rs index 402efb1c..5df6a6a5 100644 --- a/easytier/src/proto/rpc_impl/client.rs +++ b/easytier/src/proto/rpc_impl/client.rs @@ -306,7 +306,6 @@ impl Client { accepted_algo: CompressionAlgoPb::Zstd.into(), }, }); - let timeout_dur = std::time::Duration::from_millis(ctrl.timeout_ms() as u64); let mut rpc_packet = timeout(timeout_dur, self.do_rpc(packets, &mut rx)).await??; diff --git a/easytier/src/proto/rpc_impl/server.rs b/easytier/src/proto/rpc_impl/server.rs index 611f64b9..94de54c8 100644 --- a/easytier/src/proto/rpc_impl/server.rs +++ b/easytier/src/proto/rpc_impl/server.rs @@ -301,7 +301,6 @@ impl Server { accepted_algo: CompressionAlgoPb::Zstd.into(), }, }); - for packet in packets { if let Err(err) = sender.send(packet).await { tracing::error!(?err, "Failed to send response packet"); diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index c23b0a77..34067b56 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -19,7 +19,7 @@ use crate::{ common::{ config::{ConfigLoader, NetworkIdentity, PortForwardConfig, TomlConfigLoader}, netns::{NetNS, ROOT_NETNS_NAME}, - stats_manager::{LabelType, MetricName}, + stats_manager::{LabelSet, LabelType, MetricName}, }, instance::instance::Instance, proto::{ @@ -3075,6 +3075,124 @@ pub async fn relay_peer_e2e_encryption(#[values("tcp", "udp")] proto: &str) { drop_insts(insts).await; } +#[tokio::test] +#[serial_test::serial] +pub async fn relay_peer_e2e_encryption_udp() { + let insts = init_three_node_ex( + "udp", + |cfg| { + cfg.set_secure_mode(Some(generate_secure_mode_config())); + cfg + }, + false, + ) + .await; + + let inst1_id = insts[0].get_global_ctx().get_id().to_string(); + let inst3_id = insts[2].get_global_ctx().get_id().to_string(); + let network_name = insts[0].get_global_ctx().get_network_name(); + let total_labels = + LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())); + + wait_for_condition( + || async { + let routes = insts[0].get_peer_manager().list_routes().await; + routes.len() == 2 + }, + Duration::from_secs(10), + ) + .await; + + wait_for_condition( + || async { ping_test("net_a", "10.144.144.3", None).await }, + Duration::from_secs(6), + ) + .await; + + let tx_labels = LabelSet::new() + .with_label_type(LabelType::NetworkName(network_name.clone())) + .with_label_type(LabelType::ToInstanceId(inst3_id.clone())); + let rx_labels = LabelSet::new() + .with_label_type(LabelType::NetworkName(network_name.clone())) + .with_label_type(LabelType::FromInstanceId(inst1_id.clone())); + + wait_for_condition( + || async { + insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesTx, &tx_labels) + .is_none() + && insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsTx, &tx_labels) + .is_none() + && insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesTx, &total_labels) + .is_some_and(|metric| metric.value > 0) + && insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsTx, &total_labels) + .is_some_and(|metric| metric.value > 0) + && insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesTxByInstance, &tx_labels) + .is_some_and(|metric| metric.value > 0) + && insts[0] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsTxByInstance, &tx_labels) + .is_some_and(|metric| metric.value > 0) + }, + Duration::from_secs(10), + ) + .await; + + wait_for_condition( + || async { + insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesRx, &rx_labels) + .is_none() + && insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsRx, &rx_labels) + .is_none() + && insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesRx, &total_labels) + .is_some_and(|metric| metric.value > 0) + && insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsRx, &total_labels) + .is_some_and(|metric| metric.value > 0) + && insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficBytesRxByInstance, &rx_labels) + .is_some_and(|metric| metric.value > 0) + && insts[2] + .get_global_ctx() + .stats_manager() + .get_metric(MetricName::TrafficPacketsRxByInstance, &rx_labels) + .is_some_and(|metric| metric.value > 0) + }, + Duration::from_secs(10), + ) + .await; + + drop_insts(insts).await; +} + /// Test Relay Peer session cleanup on relay failure - TCP #[tokio::test] #[serial_test::serial]