fix(connector): classify manual reconnect timeouts by stage

This commit is contained in:
fanyang89
2026-04-05 23:20:04 +08:00
committed by fanyang
parent 12a7b5a5c5
commit bc1b0d7f4b
2 changed files with 179 additions and 57 deletions
+165 -50
View File
@@ -1,6 +1,8 @@
use std::{ use std::{
collections::BTreeSet, collections::BTreeSet,
future::Future,
sync::{Arc, Weak}, sync::{Arc, Weak},
time::{Duration, Instant},
}; };
use dashmap::DashSet; use dashmap::DashSet;
@@ -16,7 +18,7 @@ use crate::{
}, },
rpc_types::{self, controller::BaseController}, rpc_types::{self, controller::BaseController},
}, },
tunnel::{IpVersion, TunnelConnector}, tunnel::{IpVersion, TunnelConnector, TunnelScheme, matches_scheme},
utils::weak_upgrade, utils::weak_upgrade,
}; };
@@ -83,6 +85,108 @@ impl ManualConnectorManager {
ret ret
} }
fn reconnect_timeout(dead_url: &url::Url) -> Duration {
let use_long_timeout = matches_scheme!(
dead_url,
TunnelScheme::Http | TunnelScheme::Https | TunnelScheme::Txt | TunnelScheme::Srv
) || matches!(dead_url.scheme(), "ws" | "wss");
Duration::from_secs(if use_long_timeout { 20 } else { 2 })
}
fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option<Duration> {
let remaining = total_timeout.checked_sub(started_at.elapsed())?;
(!remaining.is_zero()).then_some(remaining)
}
fn emit_connect_error(
data: &ConnectorManagerData,
dead_url: &url::Url,
ip_version: IpVersion,
error: &Error,
) {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:#?}", error),
));
}
fn reconnect_timeout_error(stage: &str, duration: Duration) -> Error {
Error::AnyhowError(anyhow::anyhow!("{} timeout after {:?}", stage, duration))
}
async fn with_reconnect_timeout<T, F>(
stage: &'static str,
started_at: Instant,
total_timeout: Duration,
fut: F,
) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or_else(|| Self::reconnect_timeout_error(stage, started_at.elapsed()))?;
timeout(remaining, fut)
.await
.map_err(|_| Self::reconnect_timeout_error(stage, remaining))?
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn reconnect_timeout_reports_exhausted_budget_for_stage() {
let started_at = Instant::now() - Duration::from_millis(50);
let err = ManualConnectorManager::with_reconnect_timeout(
"resolve",
started_at,
Duration::from_millis(1),
async { Ok::<(), Error>(()) },
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("resolve timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_reports_stage_timeout_with_remaining_budget() {
let err = ManualConnectorManager::with_reconnect_timeout(
"handshake",
Instant::now(),
Duration::from_millis(10),
async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<(), Error>(())
},
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("handshake timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_preserves_success_within_budget() {
let result = ManualConnectorManager::with_reconnect_timeout(
"connect",
Instant::now(),
Duration::from_millis(50),
async { Ok::<_, Error>(123_u32) },
)
.await
.unwrap();
assert_eq!(result, 123);
}
}
impl ManualConnectorManager {
pub fn add_connector<T>(&self, connector: T) pub fn add_connector<T>(&self, connector: T)
where where
T: TunnelConnector + 'static, T: TunnelConnector + 'static,
@@ -242,11 +346,18 @@ impl ManualConnectorManager {
async fn conn_reconnect_with_ip_version( async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>, data: Arc<ConnectorManagerData>,
dead_url: String, dead_url: url::Url,
ip_version: IpVersion, ip_version: IpVersion,
started_at: Instant,
total_timeout: Duration,
) -> Result<ReconnResult, Error> { ) -> Result<ReconnResult, Error> {
let connector = let connector = Self::with_reconnect_timeout(
create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?; "resolve",
started_at,
total_timeout,
create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version),
)
.await?;
data.global_ctx data.global_ctx
.issue_event(GlobalCtxEvent::Connecting(connector.remote_url())); .issue_event(GlobalCtxEvent::Connecting(connector.remote_url()));
@@ -257,10 +368,25 @@ impl ManualConnectorManager {
))); )));
}; };
let (peer_id, conn_id) = pm.try_direct_connect(connector).await?; let tunnel = Self::with_reconnect_timeout(
"connect",
started_at,
total_timeout,
pm.connect_tunnel(connector),
)
.await?;
let (peer_id, conn_id) = Self::with_reconnect_timeout(
"handshake",
started_at,
total_timeout,
pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None),
)
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult { Ok(ReconnResult {
dead_url, dead_url: dead_url.to_string(),
peer_id, peer_id,
conn_id, conn_id,
}) })
@@ -273,22 +399,33 @@ impl ManualConnectorManager {
tracing::info!("reconnect: {}", dead_url); tracing::info!("reconnect: {}", dead_url);
let mut ip_versions = vec![]; let mut ip_versions = vec![];
if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" { if matches_scheme!(
dead_url,
TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv
) {
ip_versions.push(IpVersion::Both); ip_versions.push(IpVersion::Both);
} else { } else {
let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?; let converted_dead_url =
let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await { match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) {
Ok(url) => url,
Err(error) => {
let error: Error = error.into();
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
let addrs = match Self::with_reconnect_timeout(
"resolve",
Instant::now(),
Self::reconnect_timeout(&dead_url),
socket_addrs(&converted_dead_url, || Some(1000)),
)
.await
{
Ok(addrs) => addrs, Ok(addrs) => addrs,
Err(e) => { Err(error) => {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
dead_url.to_string(), return Err(error);
format!("{:?}", IpVersion::Both),
format!("{:?}", e),
));
return Err(Error::AnyhowError(anyhow::anyhow!(
"get ip from url failed: {:?}",
e
)));
} }
}; };
tracing::info!(?addrs, ?dead_url, "get ip from url done"); tracing::info!(?addrs, ?dead_url, "get ip from url done");
@@ -313,46 +450,24 @@ impl ManualConnectorManager {
"cannot get ip from url" "cannot get ip from url"
))); )));
for ip_version in ip_versions { for ip_version in ip_versions {
let use_long_timeout = dead_url.scheme() == "http" let started_at = Instant::now();
|| dead_url.scheme() == "https" let ret = Self::conn_reconnect_with_ip_version(
|| dead_url.scheme() == "ws"
|| dead_url.scheme() == "wss"
|| dead_url.scheme() == "txt"
|| dead_url.scheme() == "srv";
let ret = timeout(
// allow http/websocket connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(), data.clone(),
dead_url.to_string(), dead_url.clone(),
ip_version, ip_version,
), started_at,
Self::reconnect_timeout(&dead_url),
) )
.await; .await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
match ret { match ret {
Ok(Ok(_)) => { Ok(result) => return Ok(result),
// 外层和内层都成功:解包并跳出 Err(error) => {
reconn_ret = ret.unwrap(); Self::emit_connect_error(&data, &dead_url, ip_version, &error);
break; reconn_ret = Err(error);
}
Ok(Err(e)) => {
// 外层成功,内层失败
reconn_ret = Err(e);
}
Err(e) => {
// 外层失败
reconn_ret = Err(e.into());
} }
} }
// 发送事件(只有在未 break 时才执行)
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:?}", reconn_ret),
));
} }
reconn_ret reconn_ret
+12 -5
View File
@@ -636,20 +636,27 @@ impl PeerManager {
#[tracing::instrument] #[tracing::instrument]
pub async fn try_direct_connect_with_peer_id_hint<C>( pub async fn try_direct_connect_with_peer_id_hint<C>(
&self, &self,
mut connector: C, connector: C,
peer_id_hint: Option<PeerId>, peer_id_hint: Option<PeerId>,
) -> Result<(PeerId, PeerConnId), Error> ) -> Result<(PeerId, PeerConnId), Error>
where where
C: TunnelConnector + Debug, C: TunnelConnector + Debug,
{ {
let ns = self.global_ctx.net_ns.clone(); let t = self.connect_tunnel(connector).await?;
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint)
.await .await
} }
pub(crate) async fn connect_tunnel<C>(&self, mut connector: C) -> Result<Box<dyn Tunnel>, Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
Ok(ns
.run_async(|| async move { connector.connect().await })
.await?)
}
// avoid loop back to virtual network // avoid loop back to virtual network
fn check_remote_addr_not_from_virtual_network( fn check_remote_addr_not_from_virtual_network(
&self, &self,