feat(stats): add by-instance traffic metrics (#2011)

This commit is contained in:
KKRainbow
2026-03-26 13:46:33 +08:00
committed by GitHub
parent 8e4dc508bb
commit e000636d83
16 changed files with 1836 additions and 114 deletions
+432 -34
View File
@@ -23,7 +23,7 @@ use crate::{
compressor::{Compressor as _, DefaultCompressor},
constants::EASYTIER_VERSION,
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
shrink_dashmap,
stats_manager::{CounterHandle, LabelSet, LabelType, MetricName},
stun::StunInfoCollectorTrait,
@@ -35,6 +35,10 @@ use crate::{
peer_session::PeerSessionStore,
recv_packet_from_chan,
route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface},
traffic_metrics::{
route_peer_info_instance_id, InstanceLabelKind, LogicalTrafficMetrics,
TrafficMetricRecorder,
},
PeerPacketFilter,
},
proto::{
@@ -125,6 +129,15 @@ enum RouteAlgoInst {
None,
}
impl Clone for RouteAlgoInst {
fn clone(&self) -> Self {
match self {
RouteAlgoInst::Ospf(route) => RouteAlgoInst::Ospf(route.clone()),
RouteAlgoInst::None => RouteAlgoInst::None,
}
}
}
struct SelfTxCounters {
self_tx_packets: CounterHandle,
self_tx_bytes: CounterHandle,
@@ -168,6 +181,7 @@ pub struct PeerManager {
allow_loopback_tunnel: AtomicBool,
self_tx_counters: SelfTxCounters,
traffic_metrics: Arc<TrafficMetricRecorder>,
peer_session_store: Arc<PeerSessionStore>,
is_secure_mode_enabled: bool,
@@ -300,14 +314,6 @@ impl PeerManager {
my_peer_id,
));
let relay_peer_map = RelayPeerMap::new(
peers.clone(),
Some(foreign_network_client.clone()),
global_ctx.clone(),
my_peer_id,
peer_session_store.clone(),
);
let data_compress_algo = global_ctx
.get_flags()
.data_compress_algo()
@@ -317,28 +323,89 @@ impl PeerManager {
let exit_nodes = global_ctx.config.get_exit_nodes();
let stats_manager = global_ctx.stats_manager();
let network_name = global_ctx.get_network_name();
let traffic_tx_metrics = Arc::new(LogicalTrafficMetrics::new(
stats_manager.clone(),
network_name.clone(),
MetricName::TrafficBytesTx,
MetricName::TrafficPacketsTx,
MetricName::TrafficBytesTxByInstance,
MetricName::TrafficPacketsTxByInstance,
InstanceLabelKind::To,
));
let traffic_control_tx_metrics = Arc::new(LogicalTrafficMetrics::new(
stats_manager.clone(),
network_name.clone(),
MetricName::TrafficControlBytesTx,
MetricName::TrafficControlPacketsTx,
MetricName::TrafficControlBytesTxByInstance,
MetricName::TrafficControlPacketsTxByInstance,
InstanceLabelKind::To,
));
let relay_peer_map = RelayPeerMap::new(
peers.clone(),
Some(foreign_network_client.clone()),
global_ctx.clone(),
my_peer_id,
peer_session_store.clone(),
);
let self_tx_counters = SelfTxCounters {
self_tx_packets: stats_manager.get_counter(
MetricName::TrafficPacketsSelfTx,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())),
),
self_tx_bytes: stats_manager.get_counter(
MetricName::TrafficBytesSelfTx,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())),
),
compress_tx_bytes_before: stats_manager.get_counter(
MetricName::CompressionBytesTxBefore,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())),
),
compress_tx_bytes_after: stats_manager.get_counter(
MetricName::CompressionBytesTxAfter,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone())),
),
};
let traffic_rx_metrics = Arc::new(LogicalTrafficMetrics::new(
stats_manager.clone(),
network_name,
MetricName::TrafficBytesRx,
MetricName::TrafficPacketsRx,
MetricName::TrafficBytesRxByInstance,
MetricName::TrafficPacketsRxByInstance,
InstanceLabelKind::From,
));
let traffic_control_rx_metrics = Arc::new(LogicalTrafficMetrics::new(
stats_manager.clone(),
global_ctx.get_network_name(),
MetricName::TrafficControlBytesRx,
MetricName::TrafficControlPacketsRx,
MetricName::TrafficControlBytesRxByInstance,
MetricName::TrafficControlPacketsRxByInstance,
InstanceLabelKind::From,
));
let route_algo_inst_for_metrics = route_algo_inst.clone();
let traffic_metrics = Arc::new(TrafficMetricRecorder::new(
my_peer_id,
traffic_tx_metrics,
traffic_control_tx_metrics,
traffic_rx_metrics,
traffic_control_rx_metrics,
move |peer_id| {
let route_algo_inst = route_algo_inst_for_metrics.clone();
async move {
match &route_algo_inst {
RouteAlgoInst::Ospf(route) => route
.get_peer_info(peer_id)
.await
.as_ref()
.and_then(route_peer_info_instance_id),
RouteAlgoInst::None => None,
}
}
},
));
PeerManager {
my_peer_id,
@@ -376,6 +443,7 @@ impl PeerManager {
allow_loopback_tunnel: AtomicBool::new(true),
self_tx_counters,
traffic_metrics,
peer_session_store,
is_secure_mode_enabled,
@@ -818,6 +886,7 @@ impl PeerManager {
stats_mgr.get_counter(MetricName::CompressionBytesRxBefore, label_set.clone());
let compress_rx_bytes_after =
stats_mgr.get_counter(MetricName::CompressionBytesRxAfter, label_set.clone());
let traffic_metrics = self.traffic_metrics.clone();
self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv");
@@ -838,6 +907,8 @@ impl PeerManager {
tracing::trace!(?hdr, "peer recv a packet...");
let from_peer_id = hdr.from_peer_id.get();
let to_peer_id = hdr.to_peer_id.get();
let packet_type = hdr.packet_type;
let is_encrypted = hdr.is_encrypted();
if to_peer_id != my_peer_id {
if hdr.forward_counter > 7 {
tracing::warn!(?hdr, "forward counter exceed, drop packet");
@@ -846,10 +917,10 @@ impl PeerManager {
// Step 10b: credential nodes don't forward handshake packets
if is_credential_node
&& (hdr.packet_type == PacketType::HandShake as u8
|| hdr.packet_type == PacketType::NoiseHandshakeMsg1 as u8
|| hdr.packet_type == PacketType::NoiseHandshakeMsg2 as u8
|| hdr.packet_type == PacketType::NoiseHandshakeMsg3 as u8)
&& (packet_type == PacketType::HandShake as u8
|| packet_type == PacketType::NoiseHandshakeMsg1 as u8
|| packet_type == PacketType::NoiseHandshakeMsg2 as u8
|| packet_type == PacketType::NoiseHandshakeMsg3 as u8)
{
tracing::debug!("credential node dropping forwarded handshake packet");
continue;
@@ -865,9 +936,9 @@ impl PeerManager {
if from_peer_id == my_peer_id {
compress_tx_bytes_before.add(buf_len as u64);
if hdr.packet_type == PacketType::Data as u8
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8
if packet_type == PacketType::Data as u8
|| packet_type == PacketType::KcpSrc as u8
|| packet_type == PacketType::KcpDst as u8
{
let _ = Self::try_compress_and_encrypt(
compress_algo,
@@ -887,10 +958,16 @@ impl PeerManager {
}
tracing::trace!(?to_peer_id, ?my_peer_id, "need forward");
let tx_metrics = if from_peer_id == my_peer_id {
Some(&traffic_metrics)
} else {
None
};
let ret = Self::send_msg_internal(
&peers,
&foreign_client,
&relay_peer_map,
tx_metrics,
ret,
to_peer_id,
)
@@ -899,8 +976,8 @@ impl PeerManager {
tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error");
}
} else {
if hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
if packet_type == PacketType::RelayHandshake as u8
|| packet_type == PacketType::RelayHandshakeAck as u8
{
let _ = relay_peer_map.handle_handshake_packet(ret).await;
continue;
@@ -910,7 +987,7 @@ impl PeerManager {
tracing::error!(?e, "decrypt failed");
continue;
}
} else if hdr.is_encrypted() {
} else if is_encrypted {
match relay_peer_map.decrypt_if_needed(&mut ret).await {
Ok(true) => {}
Ok(false) => {
@@ -926,6 +1003,9 @@ impl PeerManager {
self_rx_bytes.add(buf_len as u64);
self_rx_packets.inc();
traffic_metrics
.record_rx(from_peer_id, packet_type, buf_len as u64)
.await;
compress_rx_bytes_before.add(buf_len as u64);
let compressor = DefaultCompressor {};
@@ -1267,6 +1347,7 @@ impl PeerManager {
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
Some(&self.traffic_metrics),
msg,
dst_peer_id,
)
@@ -1282,19 +1363,19 @@ impl PeerManager {
peers: &Arc<PeerMap>,
foreign_network_client: &Arc<ForeignNetworkClient>,
relay_peer_map: &Arc<RelayPeerMap>,
direct_tx_metrics: Option<&Arc<TrafficMetricRecorder>>,
msg: ZCPacket,
dst_peer_id: PeerId,
) -> Result<(), Error> {
let policy =
Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first());
if peers.has_peer(dst_peer_id) {
return peers.send_msg_directly(msg, dst_peer_id).await;
let packet_type = msg.peer_manager_header().unwrap().packet_type;
let msg_len = msg.buf_len() as u64;
let send_result = if peers.has_peer(dst_peer_id) {
peers.send_msg_directly(msg, dst_peer_id).await
} else if foreign_network_client.has_next_hop(dst_peer_id) {
return foreign_network_client.send_msg(msg, dst_peer_id).await;
}
if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await {
foreign_network_client.send_msg(msg, dst_peer_id).await
} else if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await {
if peers.has_peer(gateway) || foreign_network_client.has_next_hop(gateway) {
relay_peer_map.send_msg(msg, dst_peer_id, policy).await
} else {
@@ -1311,7 +1392,15 @@ impl PeerManager {
} else {
tracing::debug!(?dst_peer_id, "no gateway for peer");
Err(Error::RouteError(None))
};
if send_result.is_ok() {
if let Some(metrics) = direct_tx_metrics {
metrics.record_tx(dst_peer_id, packet_type, msg_len).await;
}
}
send_result
}
pub async fn get_msg_dst_peer(&self, addr: &IpAddr) -> (Vec<PeerId>, bool) {
@@ -1454,6 +1543,7 @@ impl PeerManager {
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
Some(&self.traffic_metrics),
msg,
cur_to_peer_id,
)
@@ -1533,6 +1623,7 @@ impl PeerManager {
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
Some(&self.traffic_metrics),
msg,
*peer_id,
)
@@ -1604,6 +1695,30 @@ impl PeerManager {
});
}
async fn run_traffic_metrics_gc_routine(&self) {
let mut event_receiver = self.global_ctx.subscribe();
let traffic_metrics = self.traffic_metrics.clone();
self.tasks.lock().await.spawn(async move {
loop {
match event_receiver.recv().await {
Ok(GlobalCtxEvent::PeerRemoved(peer_id)) => {
traffic_metrics.remove_peer(peer_id);
}
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(
skipped,
"traffic metrics GC receiver lagged; clearing peer cache to avoid stale metric attribution"
);
traffic_metrics.clear_peer_cache();
event_receiver = event_receiver.resubscribe();
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
async fn run_foriegn_network(&self) {
self.peer_rpc_tspt
.foreign_peers
@@ -1628,6 +1743,7 @@ impl PeerManager {
self.run_relay_session_gc_routine().await;
self.run_recent_traffic_gc_routine().await;
self.run_peer_session_gc_routine().await;
self.run_traffic_metrics_gc_routine().await;
self.run_foriegn_network().await;
@@ -1867,6 +1983,7 @@ mod tests {
common::{
config::Flags,
global_ctx::{tests::get_mock_global_ctx, NetworkIdentity},
stats_manager::{LabelSet, LabelType, MetricName},
},
connector::{
create_connector_by_url, direct::PeerManagerForDirectConnector,
@@ -1891,6 +2008,7 @@ mod tests {
tunnel::{
common::tests::wait_for_condition,
filter::{tests::DropSendTunnelFilter, TunnelWithFilter},
packet_def::{PacketType, ZCPacket},
ring::create_ring_tunnel_pair,
TunnelConnector, TunnelListener,
},
@@ -1906,6 +2024,15 @@ mod tests {
peer_mgr
}
fn metric_value(peer_mgr: &PeerManager, metric: MetricName, labels: &LabelSet) -> u64 {
peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(metric, labels)
.map(|metric| metric.value)
.unwrap_or(0)
}
#[test]
fn recent_traffic_fanout_policy_only_marks_single_peer() {
assert!(PeerManager::should_mark_recent_traffic_for_fanout(0));
@@ -2013,6 +2140,277 @@ mod tests {
assert_eq!(signal.version(), initial_version + 2);
}
#[tokio::test]
async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() {
let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let dst_peer_id = peer_mgr.my_peer_id().wrapping_add(1);
let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr.get_global_ctx().get_network_name(),
));
let mut pkt = ZCPacket::new_with_payload(b"tx");
pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8);
let result = PeerManager::send_msg_internal(
&peer_mgr.peers,
&peer_mgr.foreign_network_client,
&peer_mgr.relay_peer_map,
Some(&peer_mgr.traffic_metrics),
pkt,
dst_peer_id,
)
.await;
assert!(result.is_err());
assert_eq!(
peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(MetricName::TrafficBytesTx, &network_labels)
.unwrap()
.value,
0
);
assert_eq!(
peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(MetricName::TrafficPacketsTx, &network_labels)
.unwrap()
.value,
0
);
assert!(peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(
MetricName::TrafficBytesTxByInstance,
&network_labels
.clone()
.with_label_type(LabelType::ToInstanceId("unknown".to_string())),
)
.is_none());
assert!(peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(
MetricName::TrafficPacketsTxByInstance,
&network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())),
)
.is_none());
}
#[tokio::test]
async fn send_msg_internal_does_not_record_tx_metrics_for_self_loop() {
let (s, _r) = create_packet_recv_chan();
let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::None,
get_mock_global_ctx(),
s,
));
let dst_peer_id = peer_mgr.my_peer_id();
let network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr.get_global_ctx().get_network_name(),
));
let mut pkt = ZCPacket::new_with_payload(b"tx");
pkt.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, PacketType::Data as u8);
PeerManager::send_msg_internal(
&peer_mgr.peers,
&peer_mgr.foreign_network_client,
&peer_mgr.relay_peer_map,
Some(&peer_mgr.traffic_metrics),
pkt,
dst_peer_id,
)
.await
.unwrap();
assert_eq!(
metric_value(&peer_mgr, MetricName::TrafficBytesTx, &network_labels),
0
);
assert_eq!(
metric_value(&peer_mgr, MetricName::TrafficPacketsTx, &network_labels),
0
);
assert_eq!(
metric_value(
&peer_mgr,
MetricName::TrafficControlBytesTx,
&network_labels
),
0
);
assert_eq!(
metric_value(
&peer_mgr,
MetricName::TrafficControlPacketsTx,
&network_labels
),
0
);
assert!(peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(
MetricName::TrafficBytesTxByInstance,
&network_labels
.clone()
.with_label_type(LabelType::ToInstanceId("unknown".to_string())),
)
.is_none());
assert!(peer_mgr
.get_global_ctx()
.stats_manager()
.get_metric(
MetricName::TrafficControlBytesTxByInstance,
&network_labels.with_label_type(LabelType::ToInstanceId("unknown".to_string())),
)
.is_none());
}
#[tokio::test]
async fn send_msg_internal_records_data_metrics_for_direct_peer() {
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr_a.get_global_ctx().get_network_name(),
));
let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr_b.get_global_ctx().get_network_name(),
));
let a_data_tx_before =
metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels);
let b_data_rx_before =
metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels);
let mut pkt = ZCPacket::new_with_payload(b"data");
pkt.fill_peer_manager_hdr(
peer_mgr_a.my_peer_id(),
peer_mgr_b.my_peer_id(),
PacketType::Data as u8,
);
let pkt_len = pkt.buf_len() as u64;
PeerManager::send_msg_internal(
&peer_mgr_a.peers,
&peer_mgr_a.foreign_network_client,
&peer_mgr_a.relay_peer_map,
Some(&peer_mgr_a.traffic_metrics),
pkt,
peer_mgr_b.my_peer_id(),
)
.await
.unwrap();
wait_for_condition(
|| {
let peer_mgr_a = peer_mgr_a.clone();
let peer_mgr_b = peer_mgr_b.clone();
let a_network_labels = a_network_labels.clone();
let b_network_labels = b_network_labels.clone();
async move {
metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels)
>= a_data_tx_before + pkt_len
&& metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels)
>= b_data_rx_before + pkt_len
}
},
Duration::from_secs(5),
)
.await;
}
#[tokio::test]
async fn send_msg_internal_records_control_metrics_for_direct_peer() {
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
let a_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr_a.get_global_ctx().get_network_name(),
));
let b_network_labels = LabelSet::new().with_label_type(LabelType::NetworkName(
peer_mgr_b.get_global_ctx().get_network_name(),
));
let a_control_tx_before = metric_value(
&peer_mgr_a,
MetricName::TrafficControlBytesTx,
&a_network_labels,
);
let b_control_rx_before = metric_value(
&peer_mgr_b,
MetricName::TrafficControlBytesRx,
&b_network_labels,
);
let a_data_tx_before =
metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels);
let b_data_rx_before =
metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels);
let mut pkt = ZCPacket::new_with_payload(b"ctrl");
pkt.fill_peer_manager_hdr(
peer_mgr_a.my_peer_id(),
peer_mgr_b.my_peer_id(),
PacketType::RpcReq as u8,
);
let pkt_len = pkt.buf_len() as u64;
PeerManager::send_msg_internal(
&peer_mgr_a.peers,
&peer_mgr_a.foreign_network_client,
&peer_mgr_a.relay_peer_map,
Some(&peer_mgr_a.traffic_metrics),
pkt,
peer_mgr_b.my_peer_id(),
)
.await
.unwrap();
wait_for_condition(
|| {
let peer_mgr_a = peer_mgr_a.clone();
let peer_mgr_b = peer_mgr_b.clone();
let a_network_labels = a_network_labels.clone();
let b_network_labels = b_network_labels.clone();
async move {
metric_value(
&peer_mgr_a,
MetricName::TrafficControlBytesTx,
&a_network_labels,
) >= a_control_tx_before + pkt_len
&& metric_value(
&peer_mgr_b,
MetricName::TrafficControlBytesRx,
&b_network_labels,
) >= b_control_rx_before + pkt_len
}
},
Duration::from_secs(5),
)
.await;
assert_eq!(
metric_value(&peer_mgr_a, MetricName::TrafficBytesTx, &a_network_labels),
a_data_tx_before
);
assert_eq!(
metric_value(&peer_mgr_b, MetricName::TrafficBytesRx, &b_network_labels),
b_data_rx_before
);
}
#[tokio::test]
async fn recent_traffic_tolerates_future_timestamps() {
let peer_mgr_a = create_lazy_peer_manager().await;