mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 02:09:06 +00:00
add token bucket limiter on peer conn recv (#1842)
We should limit peer conn recv to make sure we don't recv too much from peers.
This commit is contained in:
@@ -3,6 +3,7 @@ use dashmap::DashMap;
|
|||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio::sync::Notify;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
|
|
||||||
use crate::common::scoped_task::ScopedTask;
|
use crate::common::scoped_task::ScopedTask;
|
||||||
@@ -15,6 +16,8 @@ pub struct TokenBucket {
|
|||||||
config: BucketConfig, // Immutable configuration
|
config: BucketConfig, // Immutable configuration
|
||||||
refill_task: Mutex<Option<ScopedTask<()>>>, // Background refill task
|
refill_task: Mutex<Option<ScopedTask<()>>>, // Background refill task
|
||||||
start_time: Instant, // Bucket creation time
|
start_time: Instant, // Bucket creation time
|
||||||
|
|
||||||
|
refill_notifier: Arc<Notify>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
@@ -64,11 +67,13 @@ impl TokenBucket {
|
|||||||
config,
|
config,
|
||||||
refill_task: Mutex::new(None),
|
refill_task: Mutex::new(None),
|
||||||
start_time: std::time::Instant::now(),
|
start_time: std::time::Instant::now(),
|
||||||
|
refill_notifier: Arc::new(Notify::new()),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Start background refill task
|
// Start background refill task
|
||||||
let weak_bucket = Arc::downgrade(&arc_self);
|
let weak_bucket = Arc::downgrade(&arc_self);
|
||||||
let refill_interval = arc_self.config.refill_interval;
|
let refill_interval = arc_self.config.refill_interval;
|
||||||
|
let refill_notifer = arc_self.refill_notifier.clone();
|
||||||
let refill_task = tokio::spawn(async move {
|
let refill_task = tokio::spawn(async move {
|
||||||
let mut interval = time::interval(refill_interval);
|
let mut interval = time::interval(refill_interval);
|
||||||
loop {
|
loop {
|
||||||
@@ -77,6 +82,7 @@ impl TokenBucket {
|
|||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
bucket.refill();
|
bucket.refill();
|
||||||
|
refill_notifer.notify_waiters();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -154,6 +160,13 @@ impl TokenBucket {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Consume tokens, blocking if not available
|
||||||
|
pub async fn consume(&self, tokens: u64) {
|
||||||
|
while !self.try_consume(tokens) {
|
||||||
|
self.refill_notifier.notified().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TokenBucketManager {
|
pub struct TokenBucketManager {
|
||||||
@@ -177,10 +190,16 @@ impl TokenBucketManager {
|
|||||||
let retain_task = tokio::spawn(async move {
|
let retain_task = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
// Retain only buckets that are still in use
|
// Retain only buckets that are still in use
|
||||||
|
let old_len = buckets_clone.len();
|
||||||
buckets_clone.retain(|_, bucket| Arc::<TokenBucket>::strong_count(bucket) > 1);
|
buckets_clone.retain(|_, bucket| Arc::<TokenBucket>::strong_count(bucket) > 1);
|
||||||
buckets_clone.shrink_to_fit();
|
buckets_clone.shrink_to_fit();
|
||||||
// Sleep for a while before next retention check
|
// Sleep for a while before next retention check
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
tracing::info!(
|
||||||
|
"Retained buckets: {} ({} dropped)",
|
||||||
|
buckets_clone.len(),
|
||||||
|
old_len.saturating_sub(buckets_clone.len())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -221,7 +221,7 @@ impl ForeignNetworkEntry {
|
|||||||
|
|
||||||
async fn recv(&self) -> Result<ZCPacket, Error> {
|
async fn recv(&self) -> Result<ZCPacket, Error> {
|
||||||
if let Some(o) = self.packet_recv.lock().await.recv().await {
|
if let Some(o) = self.packet_recv.lock().await.recv().await {
|
||||||
tracing::info!("recv rpc packet in foreign network manager rpc transport");
|
tracing::trace!("recv rpc packet in foreign network manager rpc transport");
|
||||||
Ok(o)
|
Ok(o)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Unknown)
|
Err(Error::Unknown)
|
||||||
@@ -335,7 +335,7 @@ impl ForeignNetworkEntry {
|
|||||||
tracing::warn!("invalid packet, skip");
|
tracing::warn!("invalid packet, skip");
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
tracing::info!(?hdr, "recv packet in foreign network manager");
|
tracing::trace!(?hdr, "recv packet in foreign network manager");
|
||||||
let to_peer_id = hdr.to_peer_id.get();
|
let to_peer_id = hdr.to_peer_id.get();
|
||||||
if to_peer_id == my_node_id {
|
if to_peer_id == my_node_id {
|
||||||
if hdr.packet_type == PacketType::TaRpc as u8
|
if hdr.packet_type == PacketType::TaRpc as u8
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ use crate::{
|
|||||||
peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn},
|
peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn},
|
||||||
proto::{
|
proto::{
|
||||||
api::instance::{PeerConnInfo, PeerConnStats},
|
api::instance::{PeerConnInfo, PeerConnStats},
|
||||||
common::{SecureModeConfig, TunnelInfo},
|
common::{LimiterConfig, SecureModeConfig, TunnelInfo},
|
||||||
peer_rpc::{
|
peer_rpc::{
|
||||||
HandshakeRequest, PeerConnNoiseMsg1Pb, PeerConnNoiseMsg2Pb, PeerConnNoiseMsg3Pb,
|
HandshakeRequest, PeerConnNoiseMsg1Pb, PeerConnNoiseMsg2Pb, PeerConnNoiseMsg3Pb,
|
||||||
PeerConnSessionActionPb, SecureAuthLevel,
|
PeerConnSessionActionPb, SecureAuthLevel,
|
||||||
@@ -1171,6 +1171,23 @@ impl PeerConn {
|
|||||||
};
|
};
|
||||||
self.counters.store(Some(Arc::new(counters)));
|
self.counters.store(Some(Arc::new(counters)));
|
||||||
|
|
||||||
|
let is_foreign_network = conn_info_for_instrument.network_name
|
||||||
|
!= self.global_ctx.get_network_identity().network_name;
|
||||||
|
let recv_limiter = if is_foreign_network {
|
||||||
|
let relay_network_bps_limit = self.global_ctx.get_flags().foreign_relay_bps_limit;
|
||||||
|
let limiter_config = LimiterConfig {
|
||||||
|
burst_rate: None,
|
||||||
|
bps: Some(relay_network_bps_limit),
|
||||||
|
fill_duration_ms: None,
|
||||||
|
};
|
||||||
|
Some(self.global_ctx.token_bucket_manager().get_or_create(
|
||||||
|
&format!("{}:recv", conn_info_for_instrument.network_name),
|
||||||
|
limiter_config.into(),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let counters = self.counters.load_full().unwrap();
|
let counters = self.counters.load_full().unwrap();
|
||||||
|
|
||||||
self.tasks.spawn(
|
self.tasks.spawn(
|
||||||
@@ -1185,8 +1202,9 @@ impl PeerConn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut zc_packet = ret.unwrap();
|
let mut zc_packet = ret.unwrap();
|
||||||
|
let buf_len = zc_packet.buf_len() as u64;
|
||||||
|
|
||||||
counters.traffic_rx_bytes.add(zc_packet.buf_len() as u64);
|
counters.traffic_rx_bytes.add(buf_len);
|
||||||
counters.traffic_rx_packets.inc();
|
counters.traffic_rx_packets.inc();
|
||||||
|
|
||||||
let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else {
|
let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else {
|
||||||
@@ -1194,7 +1212,7 @@ impl PeerConn {
|
|||||||
"unexpected packet: {:?}, cannot decode peer manager hdr",
|
"unexpected packet: {:?}, cannot decode peer manager hdr",
|
||||||
zc_packet
|
zc_packet
|
||||||
);
|
);
|
||||||
continue;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
if peer_mgr_hdr.packet_type == PacketType::Ping as u8 {
|
if peer_mgr_hdr.packet_type == PacketType::Ping as u8 {
|
||||||
@@ -1209,6 +1227,10 @@ impl PeerConn {
|
|||||||
} else if sender.send(zc_packet).await.is_err() {
|
} else if sender.send(zc_packet).await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(limiter) = recv_limiter.as_ref() {
|
||||||
|
limiter.consume(buf_len).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!("end recving peer conn packet");
|
tracing::info!("end recving peer conn packet");
|
||||||
|
|||||||
@@ -530,7 +530,7 @@ impl SyncedRouteInfo {
|
|||||||
for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() {
|
for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() {
|
||||||
let connceted_peers = conn_bitmap.get_connected_peers(peer_idx);
|
let connceted_peers = conn_bitmap.get_connected_peers(peer_idx);
|
||||||
self.fill_empty_peer_info(&connceted_peers);
|
self.fill_empty_peer_info(&connceted_peers);
|
||||||
need_inc_version = self.update_conn_info_one_peer(peer_id_version, connceted_peers);
|
need_inc_version |= self.update_conn_info_one_peer(peer_id_version, connceted_peers);
|
||||||
}
|
}
|
||||||
if need_inc_version {
|
if need_inc_version {
|
||||||
self.version.inc();
|
self.version.inc();
|
||||||
@@ -548,7 +548,7 @@ impl SyncedRouteInfo {
|
|||||||
peer_conn_info.connected_peer_ids.iter().copied().collect();
|
peer_conn_info.connected_peer_ids.iter().copied().collect();
|
||||||
|
|
||||||
self.fill_empty_peer_info(&connected_peers);
|
self.fill_empty_peer_info(&connected_peers);
|
||||||
need_inc_version = self.update_conn_info_one_peer(&peer_id_version, connected_peers);
|
need_inc_version |= self.update_conn_info_one_peer(&peer_id_version, connected_peers);
|
||||||
}
|
}
|
||||||
if need_inc_version {
|
if need_inc_version {
|
||||||
self.version.inc();
|
self.version.inc();
|
||||||
|
|||||||
@@ -1476,7 +1476,12 @@ pub async fn relay_bps_limit_test(#[values(100, 200, 400, 800)] bps_limit: u64)
|
|||||||
|
|
||||||
let bps = bps as u64 / 1024;
|
let bps = bps as u64 / 1024;
|
||||||
// allow 50kb jitter
|
// allow 50kb jitter
|
||||||
assert!(bps >= bps_limit - 50 && bps <= bps_limit + 50);
|
assert!(
|
||||||
|
bps >= bps_limit - 50 && bps <= bps_limit + 50,
|
||||||
|
"bps: {}, bps_limit: {}",
|
||||||
|
bps,
|
||||||
|
bps_limit
|
||||||
|
);
|
||||||
|
|
||||||
drop_insts(insts).await;
|
drop_insts(insts).await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user