From 55f15bb6f035128b3df271320a79066ba250b7a3 Mon Sep 17 00:00:00 2001 From: fanyang Date: Fri, 8 May 2026 22:08:51 +0800 Subject: [PATCH] fix(connector): classify manual reconnect timeouts by stage (#2062) --- easytier/src/connector/manual.rs | 214 ++++++++++++++++++++++------- easytier/src/peers/peer_manager.rs | 17 ++- 2 files changed, 174 insertions(+), 57 deletions(-) diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 8e9fd902..c797c50c 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -1,6 +1,8 @@ use std::{ collections::BTreeSet, + future::Future, sync::{Arc, Weak}, + time::{Duration, Instant}, }; use dashmap::DashSet; @@ -16,7 +18,7 @@ use crate::{ }, rpc_types::{self, controller::BaseController}, }, - tunnel::{IpVersion, TunnelConnector}, + tunnel::{IpVersion, TunnelConnector, TunnelScheme, matches_scheme}, utils::weak_upgrade, }; @@ -83,6 +85,55 @@ impl ManualConnectorManager { ret } + fn reconnect_timeout(dead_url: &url::Url) -> Duration { + let use_long_timeout = matches_scheme!( + dead_url, + TunnelScheme::Http | TunnelScheme::Https | TunnelScheme::Txt | TunnelScheme::Srv + ) || matches!(dead_url.scheme(), "ws" | "wss"); + + Duration::from_secs(if use_long_timeout { 20 } else { 2 }) + } + + fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option { + let remaining = total_timeout.checked_sub(started_at.elapsed())?; + (!remaining.is_zero()).then_some(remaining) + } + + fn emit_connect_error( + data: &ConnectorManagerData, + dead_url: &url::Url, + ip_version: IpVersion, + error: &Error, + ) { + data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( + dead_url.to_string(), + format!("{:?}", ip_version), + format!("{:#?}", error), + )); + } + + fn reconnect_timeout_error(stage: &str, duration: Duration) -> Error { + Error::AnyhowError(anyhow::anyhow!("{} timeout after {:?}", stage, duration)) + } + + async fn with_reconnect_timeout( + stage: &'static str, + started_at: Instant, + total_timeout: Duration, + fut: F, + ) -> Result + where + F: Future>, + { + let remaining = Self::remaining_budget(started_at, total_timeout) + .ok_or_else(|| Self::reconnect_timeout_error(stage, started_at.elapsed()))?; + timeout(remaining, fut) + .await + .map_err(|_| Self::reconnect_timeout_error(stage, remaining))? + } +} + +impl ManualConnectorManager { pub fn add_connector(&self, connector: T) where T: TunnelConnector + 'static, @@ -242,11 +293,18 @@ impl ManualConnectorManager { async fn conn_reconnect_with_ip_version( data: Arc, - dead_url: String, + dead_url: url::Url, ip_version: IpVersion, + started_at: Instant, + total_timeout: Duration, ) -> Result { - let connector = - create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?; + let connector = Self::with_reconnect_timeout( + "resolve", + started_at, + total_timeout, + create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version), + ) + .await?; data.global_ctx .issue_event(GlobalCtxEvent::Connecting(connector.remote_url())); @@ -257,10 +315,25 @@ impl ManualConnectorManager { ))); }; - let (peer_id, conn_id) = pm.try_direct_connect(connector).await?; + let tunnel = Self::with_reconnect_timeout( + "connect", + started_at, + total_timeout, + pm.connect_tunnel(connector), + ) + .await?; + + let (peer_id, conn_id) = Self::with_reconnect_timeout( + "handshake", + started_at, + total_timeout, + pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None), + ) + .await?; + tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); Ok(ReconnResult { - dead_url, + dead_url: dead_url.to_string(), peer_id, conn_id, }) @@ -273,22 +346,33 @@ impl ManualConnectorManager { tracing::info!("reconnect: {}", dead_url); let mut ip_versions = vec![]; - if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" { + if matches_scheme!( + dead_url, + TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv + ) { ip_versions.push(IpVersion::Both); } else { - let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?; - let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await { + let converted_dead_url = + match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) { + Ok(url) => url, + Err(error) => { + let error: Error = error.into(); + Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); + return Err(error); + } + }; + let addrs = match Self::with_reconnect_timeout( + "resolve", + Instant::now(), + Self::reconnect_timeout(&dead_url), + socket_addrs(&converted_dead_url, || Some(1000)), + ) + .await + { Ok(addrs) => addrs, - Err(e) => { - data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.to_string(), - format!("{:?}", IpVersion::Both), - format!("{:?}", e), - )); - return Err(Error::AnyhowError(anyhow::anyhow!( - "get ip from url failed: {:?}", - e - ))); + Err(error) => { + Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); + return Err(error); } }; tracing::info!(?addrs, ?dead_url, "get ip from url done"); @@ -313,46 +397,24 @@ impl ManualConnectorManager { "cannot get ip from url" ))); for ip_version in ip_versions { - let use_long_timeout = dead_url.scheme() == "http" - || dead_url.scheme() == "https" - || dead_url.scheme() == "ws" - || dead_url.scheme() == "wss" - || dead_url.scheme() == "txt" - || dead_url.scheme() == "srv"; - let ret = timeout( - // allow http/websocket connector to wait longer - std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }), - Self::conn_reconnect_with_ip_version( - data.clone(), - dead_url.to_string(), - ip_version, - ), + let started_at = Instant::now(); + let ret = Self::conn_reconnect_with_ip_version( + data.clone(), + dead_url.clone(), + ip_version, + started_at, + Self::reconnect_timeout(&dead_url), ) .await; tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); match ret { - Ok(Ok(_)) => { - // 外层和内层都成功:解包并跳出 - reconn_ret = ret.unwrap(); - break; - } - Ok(Err(e)) => { - // 外层成功,内层失败 - reconn_ret = Err(e); - } - Err(e) => { - // 外层失败 - reconn_ret = Err(e.into()); + Ok(result) => return Ok(result), + Err(error) => { + Self::emit_connect_error(&data, &dead_url, ip_version, &error); + reconn_ret = Err(error); } } - - // 发送事件(只有在未 break 时才执行) - data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.to_string(), - format!("{:?}", ip_version), - format!("{:?}", reconn_ret), - )); } reconn_ret @@ -388,6 +450,54 @@ mod tests { use super::*; + #[tokio::test] + async fn reconnect_timeout_reports_exhausted_budget_for_stage() { + let started_at = Instant::now() - Duration::from_millis(50); + let err = ManualConnectorManager::with_reconnect_timeout( + "resolve", + started_at, + Duration::from_millis(1), + async { Ok::<(), Error>(()) }, + ) + .await + .unwrap_err(); + + let message = err.to_string(); + assert!(message.contains("resolve timeout after")); + } + + #[tokio::test] + async fn reconnect_timeout_reports_stage_timeout_with_remaining_budget() { + let err = ManualConnectorManager::with_reconnect_timeout( + "handshake", + Instant::now(), + Duration::from_millis(10), + async { + tokio::time::sleep(Duration::from_millis(50)).await; + Ok::<(), Error>(()) + }, + ) + .await + .unwrap_err(); + + let message = err.to_string(); + assert!(message.contains("handshake timeout after")); + } + + #[tokio::test] + async fn reconnect_timeout_preserves_success_within_budget() { + let result = ManualConnectorManager::with_reconnect_timeout( + "connect", + Instant::now(), + Duration::from_millis(50), + async { Ok::<_, Error>(123_u32) }, + ) + .await + .unwrap(); + + assert_eq!(result, 123); + } + #[tokio::test] async fn test_reconnect_with_connecting_addr() { set_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, 1); diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index c5d03fdd..a2748d26 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -636,20 +636,27 @@ impl PeerManager { #[tracing::instrument] pub async fn try_direct_connect_with_peer_id_hint( &self, - mut connector: C, + connector: C, peer_id_hint: Option, ) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { - let ns = self.global_ctx.net_ns.clone(); - let t = ns - .run_async(|| async move { connector.connect().await }) - .await?; + let t = self.connect_tunnel(connector).await?; self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) .await } + pub(crate) async fn connect_tunnel(&self, mut connector: C) -> Result, Error> + where + C: TunnelConnector + Debug, + { + let ns = self.global_ctx.net_ns.clone(); + Ok(ns + .run_async(|| async move { connector.connect().await }) + .await?) + } + // avoid loop back to virtual network fn check_remote_addr_not_from_virtual_network( &self,