From ffe5644ddc7ec513fd35d55b40a744bbcd56bb29 Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Thu, 29 Jan 2026 16:12:26 +0800 Subject: [PATCH] add token bucket limiter on peer conn recv (#1842) We should limit peer conn recv to make sure we don't recv too much from peers. --- easytier/src/common/token_bucket.rs | 19 +++++++++++++ easytier/src/peers/foreign_network_manager.rs | 4 +-- easytier/src/peers/peer_conn.rs | 28 +++++++++++++++++-- easytier/src/peers/peer_ospf_route.rs | 4 +-- easytier/src/tests/three_node.rs | 7 ++++- 5 files changed, 54 insertions(+), 8 deletions(-) diff --git a/easytier/src/common/token_bucket.rs b/easytier/src/common/token_bucket.rs index ef81b8de..9c706b2f 100644 --- a/easytier/src/common/token_bucket.rs +++ b/easytier/src/common/token_bucket.rs @@ -3,6 +3,7 @@ use dashmap::DashMap; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use tokio::sync::Notify; use tokio::time; use crate::common::scoped_task::ScopedTask; @@ -15,6 +16,8 @@ pub struct TokenBucket { config: BucketConfig, // Immutable configuration refill_task: Mutex>>, // Background refill task start_time: Instant, // Bucket creation time + + refill_notifier: Arc, } #[derive(Clone, Copy)] @@ -64,11 +67,13 @@ impl TokenBucket { config, refill_task: Mutex::new(None), start_time: std::time::Instant::now(), + refill_notifier: Arc::new(Notify::new()), }); // Start background refill task let weak_bucket = Arc::downgrade(&arc_self); let refill_interval = arc_self.config.refill_interval; + let refill_notifer = arc_self.refill_notifier.clone(); let refill_task = tokio::spawn(async move { let mut interval = time::interval(refill_interval); loop { @@ -77,6 +82,7 @@ impl TokenBucket { break; }; bucket.refill(); + refill_notifer.notify_waiters(); } }); @@ -154,6 +160,13 @@ impl TokenBucket { } } } + + /// Consume tokens, blocking if not available + pub async fn consume(&self, tokens: u64) { + while !self.try_consume(tokens) { + self.refill_notifier.notified().await; + } + } } pub struct TokenBucketManager { @@ -177,10 +190,16 @@ impl TokenBucketManager { let retain_task = tokio::spawn(async move { loop { // Retain only buckets that are still in use + let old_len = buckets_clone.len(); buckets_clone.retain(|_, bucket| Arc::::strong_count(bucket) > 1); buckets_clone.shrink_to_fit(); // Sleep for a while before next retention check tokio::time::sleep(Duration::from_secs(5)).await; + tracing::info!( + "Retained buckets: {} ({} dropped)", + buckets_clone.len(), + old_len.saturating_sub(buckets_clone.len()) + ); } }); diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 01edc4e6..cdffd506 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -221,7 +221,7 @@ impl ForeignNetworkEntry { async fn recv(&self) -> Result { if let Some(o) = self.packet_recv.lock().await.recv().await { - tracing::info!("recv rpc packet in foreign network manager rpc transport"); + tracing::trace!("recv rpc packet in foreign network manager rpc transport"); Ok(o) } else { Err(Error::Unknown) @@ -335,7 +335,7 @@ impl ForeignNetworkEntry { tracing::warn!("invalid packet, skip"); continue; }; - tracing::info!(?hdr, "recv packet in foreign network manager"); + tracing::trace!(?hdr, "recv packet in foreign network manager"); let to_peer_id = hdr.to_peer_id.get(); if to_peer_id == my_node_id { if hdr.packet_type == PacketType::TaRpc as u8 diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 4bac1bd3..5213d036 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -40,7 +40,7 @@ use crate::{ peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn}, proto::{ api::instance::{PeerConnInfo, PeerConnStats}, - common::{SecureModeConfig, TunnelInfo}, + common::{LimiterConfig, SecureModeConfig, TunnelInfo}, peer_rpc::{ HandshakeRequest, PeerConnNoiseMsg1Pb, PeerConnNoiseMsg2Pb, PeerConnNoiseMsg3Pb, PeerConnSessionActionPb, SecureAuthLevel, @@ -1171,6 +1171,23 @@ impl PeerConn { }; self.counters.store(Some(Arc::new(counters))); + let is_foreign_network = conn_info_for_instrument.network_name + != self.global_ctx.get_network_identity().network_name; + let recv_limiter = if is_foreign_network { + let relay_network_bps_limit = self.global_ctx.get_flags().foreign_relay_bps_limit; + let limiter_config = LimiterConfig { + burst_rate: None, + bps: Some(relay_network_bps_limit), + fill_duration_ms: None, + }; + Some(self.global_ctx.token_bucket_manager().get_or_create( + &format!("{}:recv", conn_info_for_instrument.network_name), + limiter_config.into(), + )) + } else { + None + }; + let counters = self.counters.load_full().unwrap(); self.tasks.spawn( @@ -1185,8 +1202,9 @@ impl PeerConn { } let mut zc_packet = ret.unwrap(); + let buf_len = zc_packet.buf_len() as u64; - counters.traffic_rx_bytes.add(zc_packet.buf_len() as u64); + counters.traffic_rx_bytes.add(buf_len); counters.traffic_rx_packets.inc(); let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else { @@ -1194,7 +1212,7 @@ impl PeerConn { "unexpected packet: {:?}, cannot decode peer manager hdr", zc_packet ); - continue; + break; }; if peer_mgr_hdr.packet_type == PacketType::Ping as u8 { @@ -1209,6 +1227,10 @@ impl PeerConn { } else if sender.send(zc_packet).await.is_err() { break; } + + if let Some(limiter) = recv_limiter.as_ref() { + limiter.consume(buf_len).await; + } } tracing::info!("end recving peer conn packet"); diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 335b1729..9cfb44b2 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -530,7 +530,7 @@ impl SyncedRouteInfo { for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() { let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); self.fill_empty_peer_info(&connceted_peers); - need_inc_version = self.update_conn_info_one_peer(peer_id_version, connceted_peers); + need_inc_version |= self.update_conn_info_one_peer(peer_id_version, connceted_peers); } if need_inc_version { self.version.inc(); @@ -548,7 +548,7 @@ impl SyncedRouteInfo { peer_conn_info.connected_peer_ids.iter().copied().collect(); self.fill_empty_peer_info(&connected_peers); - need_inc_version = self.update_conn_info_one_peer(&peer_id_version, connected_peers); + need_inc_version |= self.update_conn_info_one_peer(&peer_id_version, connected_peers); } if need_inc_version { self.version.inc(); diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index bc6b42be..b5e349c0 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -1476,7 +1476,12 @@ pub async fn relay_bps_limit_test(#[values(100, 200, 400, 800)] bps_limit: u64) let bps = bps as u64 / 1024; // allow 50kb jitter - assert!(bps >= bps_limit - 50 && bps <= bps_limit + 50); + assert!( + bps >= bps_limit - 50 && bps <= bps_limit + 50, + "bps: {}, bps_limit: {}", + bps, + bps_limit + ); drop_insts(insts).await; }