mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 10:14:35 +00:00
make ping more smart
This commit is contained in:
@@ -220,7 +220,7 @@ impl PeerCenterInstance {
|
|||||||
.load()
|
.load()
|
||||||
.elapsed()
|
.elapsed()
|
||||||
.as_secs()
|
.as_secs()
|
||||||
> 60
|
> 120
|
||||||
{
|
{
|
||||||
ctx.job_ctx.global_peer_map_digest.store(Digest::default());
|
ctx.job_ctx.global_peer_map_digest.store(Digest::default());
|
||||||
}
|
}
|
||||||
@@ -239,12 +239,12 @@ impl PeerCenterInstance {
|
|||||||
"get global info from center server got error result: {:?}",
|
"get global info from center server got error result: {:?}",
|
||||||
ret
|
ret
|
||||||
);
|
);
|
||||||
return Ok(1000);
|
return Ok(10000);
|
||||||
};
|
};
|
||||||
|
|
||||||
if resp == GetGlobalPeerMapResponse::default() {
|
if resp == GetGlobalPeerMapResponse::default() {
|
||||||
// digest match, no need to update
|
// digest match, no need to update
|
||||||
return Ok(5000);
|
return Ok(15000);
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
@@ -263,7 +263,7 @@ impl PeerCenterInstance {
|
|||||||
.global_peer_map_update_time
|
.global_peer_map_update_time
|
||||||
.store(Instant::now());
|
.store(Instant::now());
|
||||||
|
|
||||||
Ok(5000)
|
Ok(15000)
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -426,7 +426,7 @@ mod tests {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Duration::from_secs(10),
|
Duration::from_secs(20),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -435,7 +435,7 @@ mod tests {
|
|||||||
let rpc_service = pc.get_rpc_service();
|
let rpc_service = pc.get_rpc_service();
|
||||||
wait_for_condition(
|
wait_for_condition(
|
||||||
|| async { rpc_service.global_peer_map.read().unwrap().map.len() == 3 },
|
|| async { rpc_service.global_peer_map.read().unwrap().map.len() == 3 },
|
||||||
Duration::from_secs(10),
|
Duration::from_secs(20),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -309,6 +309,7 @@ impl PeerConn {
|
|||||||
self.ctrl_resp_sender.clone(),
|
self.ctrl_resp_sender.clone(),
|
||||||
self.latency_stats.clone(),
|
self.latency_stats.clone(),
|
||||||
self.loss_rate_stats.clone(),
|
self.loss_rate_stats.clone(),
|
||||||
|
self.throughput.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let close_event_sender = self.close_event_sender.clone().unwrap();
|
let close_event_sender = self.close_event_sender.clone().unwrap();
|
||||||
@@ -388,6 +389,7 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::common::global_ctx::tests::get_mock_global_ctx;
|
use crate::common::global_ctx::tests::get_mock_global_ctx;
|
||||||
use crate::common::new_peer_id;
|
use crate::common::new_peer_id;
|
||||||
|
use crate::common::scoped_task::ScopedTask;
|
||||||
use crate::tunnel::filter::tests::DropSendTunnelFilter;
|
use crate::tunnel::filter::tests::DropSendTunnelFilter;
|
||||||
use crate::tunnel::filter::PacketRecorderTunnelFilter;
|
use crate::tunnel::filter::PacketRecorderTunnelFilter;
|
||||||
use crate::tunnel::ring::create_ring_tunnel_pair;
|
use crate::tunnel::ring::create_ring_tunnel_pair;
|
||||||
@@ -429,13 +431,25 @@ mod tests {
|
|||||||
assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default());
|
assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) {
|
async fn peer_conn_pingpong_test_common(
|
||||||
|
drop_start: u32,
|
||||||
|
drop_end: u32,
|
||||||
|
conn_closed: bool,
|
||||||
|
drop_both: bool,
|
||||||
|
) {
|
||||||
let (c, s) = create_ring_tunnel_pair();
|
let (c, s) = create_ring_tunnel_pair();
|
||||||
|
|
||||||
// drop 1-3 packets should not affect pingpong
|
// drop 1-3 packets should not affect pingpong
|
||||||
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||||
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
||||||
|
|
||||||
|
let s = if drop_both {
|
||||||
|
let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||||
|
Box::new(TunnelWithFilter::new(s, s_recorder.clone()))
|
||||||
|
} else {
|
||||||
|
s
|
||||||
|
};
|
||||||
|
|
||||||
let c_peer_id = new_peer_id();
|
let c_peer_id = new_peer_id();
|
||||||
let s_peer_id = new_peer_id();
|
let s_peer_id = new_peer_id();
|
||||||
|
|
||||||
@@ -462,7 +476,15 @@ mod tests {
|
|||||||
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
|
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// wait 5s, conn should not be disconnected
|
let throughput = c_peer.throughput.clone();
|
||||||
|
let _t = ScopedTask::from(tokio::spawn(async move {
|
||||||
|
// if not drop both, we mock some rx traffic for client peer to test pinger
|
||||||
|
while !drop_both {
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
throughput.record_rx_bytes(3);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
tokio::time::sleep(Duration::from_secs(15)).await;
|
||||||
|
|
||||||
if conn_closed {
|
if conn_closed {
|
||||||
@@ -473,9 +495,18 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn peer_conn_pingpong_timeout() {
|
async fn peer_conn_pingpong_timeout_not_close() {
|
||||||
peer_conn_pingpong_test_common(3, 5, false).await;
|
peer_conn_pingpong_test_common(3, 5, false, false).await;
|
||||||
peer_conn_pingpong_test_common(5, 12, true).await;
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_conn_pingpong_oneside_timeout() {
|
||||||
|
peer_conn_pingpong_test_common(4, 12, false, false).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_conn_pingpong_bothside_timeout() {
|
||||||
|
peer_conn_pingpong_test_common(4, 12, true, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -6,18 +6,98 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::{sync::broadcast, task::JoinSet, time::timeout};
|
use rand::{thread_rng, Rng};
|
||||||
|
use tokio::{
|
||||||
|
sync::broadcast,
|
||||||
|
task::JoinSet,
|
||||||
|
time::{timeout, Interval},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{error::Error, PeerId},
|
common::{error::Error, PeerId},
|
||||||
tunnel::{
|
tunnel::{
|
||||||
mpsc::MpscTunnelSender,
|
mpsc::MpscTunnelSender,
|
||||||
packet_def::{PacketType, ZCPacket},
|
packet_def::{PacketType, ZCPacket},
|
||||||
stats::WindowLatency,
|
stats::{Throughput, WindowLatency},
|
||||||
TunnelError,
|
TunnelError,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct PingIntervalController {
|
||||||
|
throughput: Arc<Throughput>,
|
||||||
|
loss_rate_20: Arc<WindowLatency>,
|
||||||
|
|
||||||
|
interval: Interval,
|
||||||
|
|
||||||
|
logic_time: u64,
|
||||||
|
last_send_logic_time: u64,
|
||||||
|
|
||||||
|
backoff_idx: i32,
|
||||||
|
max_backoff_idx: i32,
|
||||||
|
|
||||||
|
last_throughput: Throughput,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PingIntervalController {
|
||||||
|
fn new(throughput: Arc<Throughput>, loss_rate_20: Arc<WindowLatency>) -> Self {
|
||||||
|
let last_throughput = *throughput;
|
||||||
|
|
||||||
|
Self {
|
||||||
|
throughput,
|
||||||
|
loss_rate_20,
|
||||||
|
interval: tokio::time::interval(Duration::from_secs(1)),
|
||||||
|
logic_time: 0,
|
||||||
|
last_send_logic_time: 0,
|
||||||
|
|
||||||
|
backoff_idx: 0,
|
||||||
|
max_backoff_idx: 5,
|
||||||
|
|
||||||
|
last_throughput,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn tick(&mut self) {
|
||||||
|
self.interval.tick().await;
|
||||||
|
self.logic_time += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tx_increase(&self) -> bool {
|
||||||
|
self.throughput.tx_packets() > self.last_throughput.tx_packets()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rx_increase(&self) -> bool {
|
||||||
|
self.throughput.rx_packets() > self.last_throughput.rx_packets()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_send_ping(&mut self) -> bool {
|
||||||
|
if self.loss_rate_20.get_latency_us::<f64>() > 0.0 {
|
||||||
|
self.backoff_idx = 0;
|
||||||
|
} else if self.tx_increase()
|
||||||
|
&& !self.rx_increase()
|
||||||
|
&& self.logic_time - self.last_send_logic_time > 2
|
||||||
|
{
|
||||||
|
// if tx increase but rx not increase, we should do pingpong more frequently
|
||||||
|
self.backoff_idx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.last_throughput = *self.throughput;
|
||||||
|
|
||||||
|
if (self.logic_time - self.last_send_logic_time) < (1 << self.backoff_idx) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.backoff_idx = std::cmp::min(self.backoff_idx + 1, self.max_backoff_idx);
|
||||||
|
|
||||||
|
// use this makes two peers not pingpong at the same time
|
||||||
|
if self.backoff_idx > self.max_backoff_idx - 2 && thread_rng().gen_bool(0.2) {
|
||||||
|
self.backoff_idx -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.last_send_logic_time = self.logic_time;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct PeerConnPinger {
|
pub struct PeerConnPinger {
|
||||||
my_peer_id: PeerId,
|
my_peer_id: PeerId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
@@ -25,6 +105,7 @@ pub struct PeerConnPinger {
|
|||||||
ctrl_sender: broadcast::Sender<ZCPacket>,
|
ctrl_sender: broadcast::Sender<ZCPacket>,
|
||||||
latency_stats: Arc<WindowLatency>,
|
latency_stats: Arc<WindowLatency>,
|
||||||
loss_rate_stats: Arc<AtomicU32>,
|
loss_rate_stats: Arc<AtomicU32>,
|
||||||
|
throughput_stats: Arc<Throughput>,
|
||||||
tasks: JoinSet<Result<(), TunnelError>>,
|
tasks: JoinSet<Result<(), TunnelError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -45,6 +126,7 @@ impl PeerConnPinger {
|
|||||||
ctrl_sender: broadcast::Sender<ZCPacket>,
|
ctrl_sender: broadcast::Sender<ZCPacket>,
|
||||||
latency_stats: Arc<WindowLatency>,
|
latency_stats: Arc<WindowLatency>,
|
||||||
loss_rate_stats: Arc<AtomicU32>,
|
loss_rate_stats: Arc<AtomicU32>,
|
||||||
|
throughput_stats: Arc<Throughput>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
my_peer_id,
|
my_peer_id,
|
||||||
@@ -54,6 +136,7 @@ impl PeerConnPinger {
|
|||||||
latency_stats,
|
latency_stats,
|
||||||
ctrl_sender,
|
ctrl_sender,
|
||||||
loss_rate_stats,
|
loss_rate_stats,
|
||||||
|
throughput_stats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,17 +208,23 @@ impl PeerConnPinger {
|
|||||||
|
|
||||||
let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100);
|
let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
|
|
||||||
|
// one with 1% precision
|
||||||
|
let loss_rate_stats_1 = WindowLatency::new(100);
|
||||||
|
// one with 20% precision, so we can fast fail this conn.
|
||||||
|
let loss_rate_stats_20 = Arc::new(WindowLatency::new(5));
|
||||||
|
|
||||||
let stopped = Arc::new(AtomicU32::new(0));
|
let stopped = Arc::new(AtomicU32::new(0));
|
||||||
|
|
||||||
// generate a pingpong task every 200ms
|
// generate a pingpong task every 200ms
|
||||||
let mut pingpong_tasks = JoinSet::new();
|
let mut pingpong_tasks = JoinSet::new();
|
||||||
let ctrl_resp_sender = self.ctrl_sender.clone();
|
let ctrl_resp_sender = self.ctrl_sender.clone();
|
||||||
let stopped_clone = stopped.clone();
|
let stopped_clone = stopped.clone();
|
||||||
|
let mut controller =
|
||||||
|
PingIntervalController::new(self.throughput_stats.clone(), loss_rate_stats_20.clone());
|
||||||
self.tasks.spawn(async move {
|
self.tasks.spawn(async move {
|
||||||
let mut req_seq = 0;
|
let mut req_seq = 0;
|
||||||
loop {
|
loop {
|
||||||
let receiver = ctrl_resp_sender.subscribe();
|
controller.tick().await;
|
||||||
let ping_res_sender = ping_res_sender.clone();
|
|
||||||
|
|
||||||
if stopped_clone.load(Ordering::Relaxed) != 0 {
|
if stopped_clone.load(Ordering::Relaxed) != 0 {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -145,7 +234,13 @@ impl PeerConnPinger {
|
|||||||
pingpong_tasks.join_next().await;
|
pingpong_tasks.join_next().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !controller.should_send_ping() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let mut sink = sink.clone();
|
let mut sink = sink.clone();
|
||||||
|
let receiver = ctrl_resp_sender.subscribe();
|
||||||
|
let ping_res_sender = ping_res_sender.clone();
|
||||||
pingpong_tasks.spawn(async move {
|
pingpong_tasks.spawn(async move {
|
||||||
let mut receiver = receiver.resubscribe();
|
let mut receiver = receiver.resubscribe();
|
||||||
let pingpong_once_ret = Self::do_pingpong_once(
|
let pingpong_once_ret = Self::do_pingpong_once(
|
||||||
@@ -163,16 +258,12 @@ impl PeerConnPinger {
|
|||||||
});
|
});
|
||||||
|
|
||||||
req_seq = req_seq.wrapping_add(1);
|
req_seq = req_seq.wrapping_add(1);
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// one with 1% precision
|
|
||||||
let loss_rate_stats_1 = WindowLatency::new(100);
|
|
||||||
// one with 20% precision, so we can fast fail this conn.
|
|
||||||
let loss_rate_stats_20 = WindowLatency::new(5);
|
|
||||||
|
|
||||||
let mut counter: u64 = 0;
|
let mut counter: u64 = 0;
|
||||||
|
let throughput = self.throughput_stats.clone();
|
||||||
|
let mut last_rx_packets = throughput.rx_packets();
|
||||||
|
|
||||||
while let Some(ret) = ping_res_receiver.recv().await {
|
while let Some(ret) = ping_res_receiver.recv().await {
|
||||||
counter += 1;
|
counter += 1;
|
||||||
@@ -199,16 +290,29 @@ impl PeerConnPinger {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
|
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
|
||||||
|
let current_rx_packets = throughput.rx_packets();
|
||||||
|
let need_close = if last_rx_packets != current_rx_packets {
|
||||||
|
// if we receive some packet from peers, we should relax the condition
|
||||||
|
counter > 50 && loss_rate_1 > 0.5
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
};
|
||||||
|
|
||||||
|
if need_close {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
?ret,
|
?ret,
|
||||||
?self,
|
?self,
|
||||||
?loss_rate_1,
|
?loss_rate_1,
|
||||||
?loss_rate_20,
|
?loss_rate_20,
|
||||||
|
?last_rx_packets,
|
||||||
|
?current_rx_packets,
|
||||||
"pingpong loss rate too high, closing"
|
"pingpong loss rate too high, closing"
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
last_rx_packets = throughput.rx_packets();
|
||||||
self.loss_rate_stats
|
self.loss_rate_stats
|
||||||
.store((loss_rate_1 * 100.0) as u32, Ordering::Relaxed);
|
.store((loss_rate_1 * 100.0) as u32, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -373,7 +373,10 @@ pub async fn subnet_proxy_three_node_test(
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[serial_test::serial]
|
#[serial_test::serial]
|
||||||
pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str) {
|
pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str) {
|
||||||
use crate::tunnel::wireguard::{WgConfig, WgTunnelConnector};
|
use crate::{
|
||||||
|
common::scoped_task::ScopedTask,
|
||||||
|
tunnel::wireguard::{WgConfig, WgTunnelConnector},
|
||||||
|
};
|
||||||
|
|
||||||
let insts = init_three_node(proto).await;
|
let insts = init_three_node(proto).await;
|
||||||
let mut inst4 = Instance::new(get_inst_config("inst4", Some("net_d"), "10.144.144.4"));
|
let mut inst4 = Instance::new(get_inst_config("inst4", Some("net_d"), "10.144.144.4"));
|
||||||
@@ -417,16 +420,25 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str
|
|||||||
);
|
);
|
||||||
|
|
||||||
set_link_status("net_d", false);
|
set_link_status("net_d", false);
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(8)).await;
|
let _t = ScopedTask::from(tokio::spawn(async move {
|
||||||
let routes = insts[0].get_peer_manager().list_routes().await;
|
// do some ping in net_a to trigger net_c pingpong
|
||||||
assert!(
|
loop {
|
||||||
routes
|
ping_test("net_a", "10.144.144.4", Some(1)).await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
wait_for_condition(
|
||||||
|
|| async {
|
||||||
|
insts[0]
|
||||||
|
.get_peer_manager()
|
||||||
|
.list_routes()
|
||||||
|
.await
|
||||||
.iter()
|
.iter()
|
||||||
.find(|r| r.peer_id == inst4.peer_id())
|
.find(|r| r.peer_id == inst4.peer_id())
|
||||||
.is_none(),
|
.is_none()
|
||||||
"inst4 should not be in inst1's route list, {:?}",
|
},
|
||||||
routes
|
Duration::from_secs(15),
|
||||||
);
|
)
|
||||||
|
.await;
|
||||||
set_link_status("net_d", true);
|
set_link_status("net_d", true);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ impl WindowLatency {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default, Copy, Clone)]
|
||||||
pub struct Throughput {
|
pub struct Throughput {
|
||||||
tx_bytes: u64,
|
tx_bytes: u64,
|
||||||
rx_bytes: u64,
|
rx_bytes: u64,
|
||||||
|
|||||||
Reference in New Issue
Block a user