From b80801cfdbe94bb7a9aa9a71ac8fd107342686ff Mon Sep 17 00:00:00 2001 From: fanyang Date: Mon, 4 May 2026 19:53:10 +0800 Subject: [PATCH] fix: avoid selecting unknown-latency peer conns --- easytier/src/peers/peer.rs | 97 ++++++++++++++++++++++++--------- easytier/src/peers/peer_conn.rs | 5 ++ 2 files changed, 75 insertions(+), 27 deletions(-) diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index c46e8001..1563aebb 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -188,37 +188,45 @@ impl Peer { async fn select_conn(&self) -> Option { let default_conn_id = self.default_conn_id.load(); - let default_conn = self.conns.get(&default_conn_id).and_then(|conn| { - if conn.is_closed() { - None + let latency_score = |latency_us| { + if latency_us == 0 { + u64::MAX } else { - Some((conn.priority(), conn.clone())) + latency_us } - }); - - if let Some((default_priority, default_conn)) = default_conn { - let has_better_conn = self.conns.iter().any(|conn| { - !conn.value().is_closed() && conn.value().priority() < default_priority - }); - if !has_better_conn { - return Some(default_conn); - } - } - - // Prefer lower listener priority first, then use latency to pick the - // initial connection within that priority class. Keep the selected conn - // sticky until a strictly better priority appears. - let selected_conn_id = self + }; + let selected_conn = self .conns .iter() .filter(|conn| !conn.value().is_closed()) - .min_by_key(|conn| (conn.value().priority(), conn.value().get_stats().latency_us)) - .map(|conn| conn.get_conn_id()); + .min_by_key(|conn| { + ( + conn.value().priority(), + latency_score(conn.value().get_stats().latency_us), + ) + }) + .map(|conn| { + ( + conn.get_conn_id(), + conn.value().priority(), + latency_score(conn.value().get_stats().latency_us), + ) + }); - selected_conn_id.and_then(|conn_id| { - self.default_conn_id.store(conn_id); - self.conns.get(&conn_id).map(|conn| conn.clone()) - }) + let (selected_conn_id, selected_priority, selected_latency) = selected_conn?; + + if let Some(default_conn) = self.conns.get(&default_conn_id) + && !default_conn.is_closed() + && ( + default_conn.priority(), + latency_score(default_conn.get_stats().latency_us), + ) == (selected_priority, selected_latency) + { + return Some(default_conn.clone()); + } + + self.default_conn_id.store(selected_conn_id); + self.conns.get(&selected_conn_id).map(|conn| conn.clone()) } pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> { @@ -409,7 +417,7 @@ mod tests { } #[tokio::test] - async fn select_conn_prefers_lower_priority_before_latency() { + async fn select_conn_prefers_priority_then_latency() { let (packet_send, _packet_recv) = create_packet_recv_chan(); let global_ctx = get_mock_global_ctx(); let local_peer_id = new_peer_id(); @@ -425,6 +433,7 @@ mod tests { ps.clone(), ); low_client_conn.set_priority(100); + low_client_conn.record_latency_for_test(1000); let low_conn_id = low_client_conn.get_conn_id(); let mut low_server_conn = PeerConn::new( remote_peer_id, @@ -449,6 +458,8 @@ mod tests { ps.clone(), ); same_priority_client_conn.set_priority(100); + same_priority_client_conn.record_latency_for_test(10); + let same_priority_conn_id = same_priority_client_conn.get_conn_id(); let mut same_priority_server_conn = PeerConn::new( remote_peer_id, global_ctx.clone(), @@ -462,7 +473,39 @@ mod tests { client_ret.unwrap(); server_ret.unwrap(); peer.add_peer_conn(same_priority_client_conn).await.unwrap(); - assert_eq!(peer.select_conn().await.unwrap().get_conn_id(), low_conn_id); + assert_eq!( + peer.select_conn().await.unwrap().get_conn_id(), + same_priority_conn_id + ); + + let (unknown_latency_client_tunnel, unknown_latency_server_tunnel) = + create_ring_tunnel_pair(); + let mut unknown_latency_client_conn = PeerConn::new( + local_peer_id, + global_ctx.clone(), + unknown_latency_client_tunnel, + ps.clone(), + ); + unknown_latency_client_conn.set_priority(100); + let mut unknown_latency_server_conn = PeerConn::new( + remote_peer_id, + global_ctx.clone(), + unknown_latency_server_tunnel, + ps.clone(), + ); + let (client_ret, server_ret) = tokio::join!( + unknown_latency_client_conn.do_handshake_as_client(), + unknown_latency_server_conn.do_handshake_as_server() + ); + client_ret.unwrap(); + server_ret.unwrap(); + peer.add_peer_conn(unknown_latency_client_conn) + .await + .unwrap(); + assert_eq!( + peer.select_conn().await.unwrap().get_conn_id(), + same_priority_conn_id + ); let (high_client_tunnel, high_server_tunnel) = create_ring_tunnel_pair(); let mut high_client_conn = PeerConn::new( diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 641a6d42..2b1c6921 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -452,6 +452,11 @@ impl PeerConn { self.priority } + #[cfg(test)] + pub(crate) fn record_latency_for_test(&self, latency_us: u32) { + self.latency_stats.record_latency(latency_us); + } + pub fn is_closed(&self) -> bool { self.close_event_notifier.is_closed() }