fix tests (#588)

fix proxy_three_node_disconnect_test and hole_punching_symmetric_only_random
This commit is contained in:
Sijie.Sun
2025-01-27 15:17:47 +08:00
committed by GitHub
parent d0f26d9303
commit 08546925cc
6 changed files with 110 additions and 66 deletions
+80 -60
View File
@@ -1,4 +1,5 @@
use std::{
fmt::Debug,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
@@ -12,6 +13,7 @@ use tokio::{
task::JoinSet,
time::{timeout, Interval},
};
use tracing::Instrument;
use crate::{
common::{error::Error, PeerId},
@@ -25,7 +27,7 @@ use crate::{
struct PingIntervalController {
throughput: Arc<Throughput>,
loss_rate_20: Arc<WindowLatency>,
loss_counter: Arc<AtomicU32>,
interval: Interval,
@@ -38,13 +40,27 @@ struct PingIntervalController {
last_throughput: Throughput,
}
impl std::fmt::Debug for PingIntervalController {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PingIntervalController")
.field("throughput", &self.throughput)
.field("loss_counter", &self.loss_counter)
.field("logic_time", &self.logic_time)
.field("last_send_logic_time", &self.last_send_logic_time)
.field("backoff_idx", &self.backoff_idx)
.field("max_backoff_idx", &self.max_backoff_idx)
.field("last_throughput", &self.last_throughput)
.finish()
}
}
impl PingIntervalController {
fn new(throughput: Arc<Throughput>, loss_rate_20: Arc<WindowLatency>) -> Self {
fn new(throughput: Arc<Throughput>, loss_counter: Arc<AtomicU32>) -> Self {
let last_throughput = *throughput;
Self {
throughput,
loss_rate_20,
loss_counter,
interval: tokio::time::interval(Duration::from_secs(1)),
logic_time: 0,
last_send_logic_time: 0,
@@ -69,13 +85,12 @@ impl PingIntervalController {
self.throughput.rx_packets() > self.last_throughput.rx_packets()
}
#[tracing::instrument]
fn should_send_ping(&mut self) -> bool {
if self.loss_rate_20.get_latency_us::<f64>() > 0.0 {
tracing::trace!(?self, "check should_send_ping");
if self.loss_counter.load(Ordering::Relaxed) > 0 {
self.backoff_idx = 0;
} else if self.tx_increase()
&& !self.rx_increase()
&& self.logic_time - self.last_send_logic_time > 2
{
} else if self.tx_increase() && !self.rx_increase() {
// if tx increase but rx not increase, we should do pingpong more frequently
self.backoff_idx = 0;
}
@@ -210,8 +225,8 @@ impl PeerConnPinger {
// one with 1% precision
let loss_rate_stats_1 = WindowLatency::new(100);
// one with 20% precision, so we can fast fail this conn.
let loss_rate_stats_20 = Arc::new(WindowLatency::new(5));
// disconnect the connection if lost 5 pingpong consecutively
let loss_counter = Arc::new(AtomicU32::new(0));
let stopped = Arc::new(AtomicU32::new(0));
@@ -220,72 +235,73 @@ impl PeerConnPinger {
let ctrl_resp_sender = self.ctrl_sender.clone();
let stopped_clone = stopped.clone();
let mut controller =
PingIntervalController::new(self.throughput_stats.clone(), loss_rate_stats_20.clone());
self.tasks.spawn(async move {
let mut req_seq = 0;
loop {
controller.tick().await;
PingIntervalController::new(self.throughput_stats.clone(), loss_counter.clone());
self.tasks.spawn(
async move {
let mut req_seq = 0;
loop {
controller.tick().await;
if stopped_clone.load(Ordering::Relaxed) != 0 {
return Ok(());
if stopped_clone.load(Ordering::Relaxed) != 0 {
return Ok(());
}
while pingpong_tasks.len() > 5 {
pingpong_tasks.join_next().await;
}
if !controller.should_send_ping() {
continue;
}
let mut sink = sink.clone();
let receiver = ctrl_resp_sender.subscribe();
let ping_res_sender = ping_res_sender.clone();
pingpong_tasks.spawn(async move {
let mut receiver = receiver.resubscribe();
let pingpong_once_ret = Self::do_pingpong_once(
my_node_id,
peer_id,
&mut sink,
&mut receiver,
req_seq,
)
.await;
if let Err(e) = ping_res_sender.send(pingpong_once_ret).await {
tracing::info!(?e, "pingpong task send result error, exit..");
};
});
req_seq = req_seq.wrapping_add(1);
}
while pingpong_tasks.len() > 5 {
pingpong_tasks.join_next().await;
}
if !controller.should_send_ping() {
continue;
}
let mut sink = sink.clone();
let receiver = ctrl_resp_sender.subscribe();
let ping_res_sender = ping_res_sender.clone();
pingpong_tasks.spawn(async move {
let mut receiver = receiver.resubscribe();
let pingpong_once_ret = Self::do_pingpong_once(
my_node_id,
peer_id,
&mut sink,
&mut receiver,
req_seq,
)
.await;
if let Err(e) = ping_res_sender.send(pingpong_once_ret).await {
tracing::info!(?e, "pingpong task send result error, exit..");
};
});
req_seq = req_seq.wrapping_add(1);
}
});
.instrument(tracing::info_span!(
"pingpong_controller",
?my_node_id,
?peer_id
)),
);
let mut counter: u64 = 0;
let throughput = self.throughput_stats.clone();
let mut last_rx_packets = throughput.rx_packets();
while let Some(ret) = ping_res_receiver.recv().await {
counter += 1;
if let Ok(lat) = ret {
latency_stats.record_latency(lat as u32);
loss_rate_stats_1.record_latency(0);
loss_rate_stats_20.record_latency(0);
} else {
loss_rate_stats_1.record_latency(1);
loss_rate_stats_20.record_latency(1);
loss_counter.fetch_add(1, Ordering::Relaxed);
}
let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us();
let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us();
tracing::trace!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
"pingpong task recv pingpong_once result"
);
@@ -293,23 +309,27 @@ impl PeerConnPinger {
if last_rx_packets != current_rx_packets {
// if we receive some packet from peers, reset the counter to avoid conn close.
// conn will close only if we have 5 continous round pingpong loss after no packet received.
counter = 0;
loss_counter.store(0, Ordering::Relaxed);
}
tracing::debug!(
"counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id
"loss_counter: {:?}, loss_rate_1: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
loss_counter,
loss_rate_1,
current_rx_packets,
last_rx_packets,
my_node_id
);
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) {
if loss_counter.load(Ordering::Relaxed) >= 5 {
tracing::warn!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
?loss_counter,
?last_rx_packets,
?current_rx_packets,
"pingpong loss rate too high, closing"
"pingpong loss too much pingpong packet and no other ingress packets, closing the connection",
);
break;
}