Compare commits

...

3 Commits

Author SHA1 Message Date
Luna Yao 79b562cdc9 drop peer_mgr in time (#2064) 2026-04-06 11:31:05 +08:00
fanyang e3f089251c fix(ospf): mitigate route sync storm under connection flapping (#2063)
Addresses issue #2016 where nodes behind unstable networks
(e.g. campus firewalls) cause excessive traffic that can freeze
the remote node.

Two changes in peer_ospf_route.rs:

- Make do_sync_route_info only trigger reverse sync_now when
  incoming data actually changed the route table or foreign
  network state.  The previous unconditional sync_now created
  an A->B->A->B ping-pong cycle on every RPC exchange.

- Add exponential backoff (50ms..5s) to session_task retry loop.
  The previous fixed 50ms retry produced ~20 RPCs/s during
  sustained network instability.
2026-04-06 11:26:20 +08:00
fanyang cf6dcbc054 Fix IPv6 TCP tunnel display formatting (#1980)
Normalize composite tunnel display values before rendering peer and
debug output so IPv6 tunnel types no longer append `6` to the port.

- Preserve prefixes like `txt-` while converting tunnel schemes to
  their IPv6 form.
- Recover malformed values such as `txt-tcp://...:110106` into
  `txt-tcp6://...:11010`.
- Reuse the normalized remote address display in CLI debug output.
2026-04-05 22:12:55 +08:00
18 changed files with 445 additions and 74 deletions
+5 -5
View File
@@ -194,11 +194,11 @@ impl super::TunnelConnector for DnsTunnelConnector {
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{}-{}",
self.addr.scheme(),
info.remote_addr.unwrap_or_default()
),
resolved_remote_addr: info
.resolved_remote_addr
.clone()
.or(info.remote_addr.clone()),
tunnel_type: format!("{}-{}", self.addr.scheme(), info.tunnel_type),
},
)))
}
+7 -5
View File
@@ -229,11 +229,11 @@ impl super::TunnelConnector for HttpTunnelConnector {
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{:?}-{}",
self.redirect_type,
info.remote_addr.unwrap_or_default()
),
resolved_remote_addr: info
.resolved_remote_addr
.clone()
.or(info.remote_addr.clone()),
tunnel_type: format!("{}-{}", self.addr.scheme(), info.tunnel_type),
},
)))
}
@@ -353,6 +353,8 @@ mod tests {
let info = t.info().unwrap();
let remote_addr = info.remote_addr.unwrap();
assert_eq!(remote_addr, test_url.into());
let resolved_remote_addr = info.resolved_remote_addr.unwrap();
assert_eq!(resolved_remote_addr.url, "tcp://127.0.0.1:25888");
tokio::join!(task).0.unwrap();
}
+1 -1
View File
@@ -1404,7 +1404,7 @@ impl<'a> CommandHandler<'a> {
"remote_addr: {}, rx_bytes: {}, tx_bytes: {}, latency_us: {}",
conn.tunnel
.as_ref()
.map(|t| t.remote_addr.clone().unwrap_or_default())
.and_then(|t| t.display_remote_addr())
.unwrap_or_default(),
conn.stats.as_ref().map(|s| s.rx_bytes).unwrap_or_default(),
conn.stats.as_ref().map(|s| s.tx_bytes).unwrap_or_default(),
@@ -232,6 +232,7 @@ async fn test_magic_dns_update_replaces_records_for_same_client() {
remote_addr: Some(crate::proto::common::Url {
url: "tcp://127.0.0.1:54321".to_string(),
}),
resolved_remote_addr: None,
}));
dns_server_inst
@@ -299,6 +300,7 @@ async fn test_magic_dns_update_replaces_records_for_same_client() {
remote_addr: Some(crate::proto::common::Url {
url: "tcp://127.0.0.1:54321".to_string(),
}),
resolved_remote_addr: None,
}));
dns_server_inst
+38 -36
View File
@@ -867,46 +867,48 @@ impl Instance {
tokio::spawn(async move {
let mut output_tx = Some(first_round_output);
loop {
let Some(peer_manager) = peer_mgr.upgrade() else {
tracing::warn!("peer manager is dropped, stop static ip check.");
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(Error::Unknown));
return;
}
return;
};
let close_notifier = Arc::new(Notify::new());
let mut new_nic_ctx = NicCtx::new(
peer_manager.get_global_ctx(),
&peer_manager,
peer_packet_receiver.clone(),
close_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(ipv4_addr, ipv6_addr).await {
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to create new nic ctx, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// Create Magic DNS runner only if we have IPv4
#[cfg(feature = "magic-dns")]
{
let ifname = new_nic_ctx.ifname().await;
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(peer_manager, ifname, ipv4)
} else {
None
let Some(peer_mgr) = peer_mgr.upgrade() else {
tracing::warn!("peer manager is dropped, stop static ip check.");
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(Error::Unknown));
return;
}
return;
};
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, dns_runner).await;
let mut new_nic_ctx = NicCtx::new(
peer_mgr.get_global_ctx(),
&peer_mgr,
peer_packet_receiver.clone(),
close_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(ipv4_addr, ipv6_addr).await {
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to create new nic ctx, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
// Create Magic DNS runner only if we have IPv4
#[cfg(feature = "magic-dns")]
{
let ifname = new_nic_ctx.ifname().await;
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(peer_mgr, ifname, ipv4)
} else {
None
};
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, dns_runner).await;
}
#[cfg(not(feature = "magic-dns"))]
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx).await;
}
#[cfg(not(feature = "magic-dns"))]
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx).await;
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Ok(()));
+30 -6
View File
@@ -659,7 +659,8 @@ impl SyncedRouteInfo {
}
}
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) {
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) -> bool {
let mut changed = false;
for item in foreign_network.infos.iter().map(Clone::clone) {
let Some(key) = item.key else {
continue;
@@ -675,10 +676,15 @@ impl SyncedRouteInfo {
.and_modify(|old_entry| {
if entry.version > old_entry.version {
*old_entry = entry.clone();
changed = true;
}
})
.or_insert_with(|| entry.clone());
.or_insert_with(|| {
changed = true;
entry.clone()
});
}
changed
}
fn update_my_peer_info(
@@ -2847,8 +2853,14 @@ impl RouteSessionManager {
dst_peer_id: PeerId,
mut sync_now: tokio::sync::broadcast::Receiver<()>,
) {
const RETRY_BASE_MS: u64 = 50;
const RETRY_MAX_MS: u64 = 5000;
let mut last_sync = Instant::now();
let mut last_clean_dst_saved_map = Instant::now();
// Keep retry_delay_ms across outer iterations so that rapid
// connect/disconnect flaps don't fully reset the backoff.
let mut retry_delay_ms = RETRY_BASE_MS;
loop {
loop {
let Some(service_impl) = service_impl.clone().upgrade() else {
@@ -2875,13 +2887,18 @@ impl RouteSessionManager {
last_clean_dst_saved_map = Instant::now();
service_impl.clean_dst_saved_map(dst_peer_id);
}
// Successful sync: decay backoff towards base so the next
// real failure still starts at a reasonable level, but
// don't fully reset to avoid 50ms bursts during flapping.
retry_delay_ms = (retry_delay_ms / 2).max(RETRY_BASE_MS);
break;
}
drop(service_impl);
drop(peer_rpc);
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(retry_delay_ms)).await;
retry_delay_ms = (retry_delay_ms * 2).min(RETRY_MAX_MS);
}
sync_now = sync_now.resubscribe();
@@ -3214,17 +3231,18 @@ impl RouteSessionManager {
service_impl.update_route_table_and_cached_local_conn_bitmap();
}
let mut foreign_network_changed = false;
if let Some(foreign_network) = &foreign_network {
// Step 9b: credential peers' foreign_network_infos are always ignored
if !from_is_credential {
service_impl
foreign_network_changed = service_impl
.synced_route_info
.update_foreign_network(foreign_network);
session.update_dst_saved_foreign_network_version(foreign_network, from_peer_id);
}
}
if need_update_route_table || foreign_network.is_some() {
if need_update_route_table || foreign_network_changed {
service_impl.update_foreign_network_owner_map();
}
@@ -3243,7 +3261,13 @@ impl RouteSessionManager {
.disconnect_untrusted_peers(&untrusted_peers)
.await;
self.sync_now("sync_route_info");
// Only trigger reverse sync when we actually received new data that
// needs to be propagated to other peers. Previously this was
// unconditional, which created an A→B→A→B ping-pong storm even when
// there was nothing new to propagate.
if need_update_route_table || foreign_network_changed {
self.sync_now("sync_route_info");
}
Ok(SyncRouteInfoResponse {
is_initiator,
+1 -16
View File
@@ -149,23 +149,8 @@ pub mod instance {
ret
}
fn is_tunnel_ipv6(tunnel_info: &super::super::common::TunnelInfo) -> bool {
let Some(local_addr) = &tunnel_info.local_addr else {
return false;
};
let u: url::Url = local_addr.clone().into();
u.host()
.map(|h| matches!(h, url::Host::Ipv6(_)))
.unwrap_or(false)
}
fn get_tunnel_proto_str(tunnel_info: &super::super::common::TunnelInfo) -> String {
if Self::is_tunnel_ipv6(tunnel_info) {
format!("{}6", tunnel_info.tunnel_type)
} else {
tunnel_info.tunnel_type.clone()
}
tunnel_info.display_tunnel_type()
}
pub fn get_conn_protos(&self) -> Option<Vec<String>> {
+1
View File
@@ -201,6 +201,7 @@ message TunnelInfo {
string tunnel_type = 1;
common.Url local_addr = 2;
common.Url remote_addr = 3;
common.Url resolved_remote_addr = 4;
}
message StunInfo {
+279 -1
View File
@@ -5,8 +5,9 @@ use std::{
use anyhow::Context;
use base64::{prelude::BASE64_STANDARD, Engine as _};
use strum::VariantArray;
use crate::tunnel::packet_def::CompressorAlgo;
use crate::tunnel::{packet_def::CompressorAlgo, IpScheme};
include!(concat!(env!("OUT_DIR"), "/common.rs"));
@@ -284,6 +285,105 @@ impl fmt::Display for Url {
}
}
fn split_tunnel_scheme(raw_scheme: &str) -> Option<(&str, &'static str, bool)> {
for scheme in IpScheme::VARIANTS {
let scheme: &'static str = scheme.into();
if let Some(base) = raw_scheme.strip_suffix('6') {
if let Some(prefix) = base.strip_suffix(scheme) {
if prefix.is_empty() || prefix.ends_with('-') {
return Some((prefix, scheme, true));
}
}
}
if let Some(prefix) = raw_scheme.strip_suffix(scheme) {
if prefix.is_empty() || prefix.ends_with('-') {
return Some((prefix, scheme, false));
}
}
}
None
}
fn normalize_tunnel_scheme(raw_scheme: &str, is_ipv6: bool) -> Option<String> {
let (prefix, scheme, had_ipv6_suffix) = split_tunnel_scheme(raw_scheme)?;
let suffix = if is_ipv6 || had_ipv6_suffix { "6" } else { "" };
Some(format!("{prefix}{scheme}{suffix}"))
}
fn infer_tunnel_ipv6(raw: &str) -> Option<bool> {
let (_, rest) = raw.split_once("://")?;
if rest.starts_with('[') {
return Some(true);
}
match url::Url::parse(raw).ok()?.host() {
Some(url::Host::Ipv4(_)) => Some(false),
Some(url::Host::Ipv6(_)) => Some(true),
Some(url::Host::Domain(_)) | None => None,
}
}
fn normalize_tunnel_port(raw_port: &str, is_ipv6: bool) -> Option<u16> {
if let Ok(port) = raw_port.parse::<u16>() {
return Some(port);
}
if is_ipv6 && raw_port.ends_with('6') {
return raw_port[..raw_port.len() - 1].parse::<u16>().ok();
}
None
}
fn normalize_tunnel_url(raw: &str, fallback_ipv6: Option<bool>) -> Option<String> {
let (raw_scheme, rest) = raw.split_once("://")?;
if let Some(rest) = rest.strip_prefix('[') {
let (host, remainder) = rest.split_once(']')?;
let scheme = normalize_tunnel_scheme(raw_scheme, true)?;
if remainder.is_empty() {
return Some(format!("{scheme}://[{host}]"));
}
let raw_port = remainder.strip_prefix(':')?;
let port = normalize_tunnel_port(raw_port, true)?;
return Some(format!("{scheme}://[{host}]:{port}"));
}
let is_ipv6 = infer_tunnel_ipv6(raw).or(fallback_ipv6).unwrap_or(false);
let scheme = normalize_tunnel_scheme(raw_scheme, is_ipv6)?;
if let Ok(url) = url::Url::parse(raw) {
let host = match url.host()? {
url::Host::Ipv4(host) => host.to_string(),
url::Host::Ipv6(host) => format!("[{host}]"),
url::Host::Domain(host) => host.to_string(),
};
return Some(match url.port_or_known_default() {
Some(port) => format!("{scheme}://{host}:{port}"),
None => format!("{scheme}://{host}"),
});
}
let (host, raw_port) = rest.rsplit_once(':')?;
let port = normalize_tunnel_port(raw_port, is_ipv6)?;
Some(format!("{scheme}://{host}:{port}"))
}
impl Url {
pub fn is_ipv6_tunnel_endpoint(&self) -> bool {
infer_tunnel_ipv6(&self.url).unwrap_or(false)
}
pub fn normalized_tunnel_display(&self) -> String {
normalize_tunnel_url(&self.url, None).unwrap_or_else(|| self.url.clone())
}
}
impl From<std::net::SocketAddr> for SocketAddr {
fn from(value: std::net::SocketAddr) -> Self {
match value {
@@ -325,6 +425,38 @@ impl Display for SocketAddr {
}
}
impl TunnelInfo {
pub fn effective_remote_addr(&self) -> Option<&Url> {
self.resolved_remote_addr
.as_ref()
.or(self.remote_addr.as_ref())
}
pub fn display_tunnel_type(&self) -> String {
let is_ipv6 = infer_tunnel_ipv6(&self.tunnel_type).or_else(|| {
self.resolved_remote_addr
.as_ref()
.or(self.local_addr.as_ref())
.or(self.remote_addr.as_ref())
.map(Url::is_ipv6_tunnel_endpoint)
});
if self.tunnel_type.contains("://") {
normalize_tunnel_url(&self.tunnel_type, is_ipv6)
.unwrap_or_else(|| self.tunnel_type.clone())
} else {
is_ipv6
.and_then(|is_ipv6| normalize_tunnel_scheme(&self.tunnel_type, is_ipv6))
.unwrap_or_else(|| self.tunnel_type.clone())
}
}
pub fn display_remote_addr(&self) -> Option<String> {
self.effective_remote_addr()
.map(Url::normalized_tunnel_display)
}
}
impl TryFrom<CompressionAlgoPb> for CompressorAlgo {
type Error = anyhow::Error;
@@ -397,3 +529,149 @@ impl SecureModeConfig {
Ok(x25519_dalek::PublicKey::from(k))
}
}
#[cfg(test)]
mod tests {
use super::{normalize_tunnel_url, TunnelInfo, Url};
fn assert_ipv6_tunnel_normalization(scheme: &str, port: u16) {
let expected = format!("{scheme}6://[2001:db8::1]:{port}");
assert_eq!(
normalize_tunnel_url(&format!("{scheme}://[2001:db8::1]:{port}"), None).as_deref(),
Some(expected.as_str())
);
}
#[test]
fn normalize_plain_ipv6_tunnel_url() {
let url = Url {
url: "tcp://[2001:db8::1]:11010".to_string(),
};
assert_eq!(
url.normalized_tunnel_display(),
"tcp6://[2001:db8::1]:11010"
);
assert!(url.is_ipv6_tunnel_endpoint());
}
#[test]
fn normalize_all_enabled_ipv6_tunnel_urls() {
assert_ipv6_tunnel_normalization("tcp", 11010);
assert_ipv6_tunnel_normalization("udp", 11010);
#[cfg(feature = "wireguard")]
assert_ipv6_tunnel_normalization("wg", 11011);
#[cfg(feature = "quic")]
assert_ipv6_tunnel_normalization("quic", 11012);
#[cfg(feature = "websocket")]
assert_ipv6_tunnel_normalization("ws", 80);
#[cfg(feature = "websocket")]
assert_ipv6_tunnel_normalization("wss", 443);
#[cfg(feature = "faketcp")]
assert_ipv6_tunnel_normalization("faketcp", 11013);
}
#[test]
fn normalize_composite_ipv6_tunnel_url() {
assert_eq!(
normalize_tunnel_url("txt-tcp://[2001:db8::1]:11010", None).as_deref(),
Some("txt-tcp6://[2001:db8::1]:11010")
);
}
#[test]
fn recover_malformed_composite_ipv6_tunnel_url() {
assert_eq!(
normalize_tunnel_url("txt-tcp://[2001:db8::1]:110106", None).as_deref(),
Some("txt-tcp6://[2001:db8::1]:11010")
);
}
#[test]
fn keep_normalized_ipv6_tunnel_url_stable() {
assert_eq!(
normalize_tunnel_url("tcp6://[2001:db8::1]:11010", None).as_deref(),
Some("tcp6://[2001:db8::1]:11010")
);
}
#[test]
fn normalize_ipv6_tunnel_url_without_explicit_port() {
assert_eq!(
normalize_tunnel_url("tcp://[2001:db8::1]", None).as_deref(),
Some("tcp6://[2001:db8::1]")
);
}
#[test]
fn keep_domain_host_unbracketed_when_ipv6_falls_back() {
assert_eq!(
normalize_tunnel_url("tcp://localhost:11010", Some(true)).as_deref(),
Some("tcp6://localhost:11010")
);
}
#[test]
fn tunnel_info_display_tunnel_type_preserves_composite_prefix() {
let tunnel = TunnelInfo {
tunnel_type: "txt-tcp://[2001:db8::2]:110106".to_string(),
local_addr: None,
remote_addr: Some(Url {
url: "txt://et.example.com".to_string(),
}),
resolved_remote_addr: None,
};
assert_eq!(
tunnel.display_tunnel_type(),
"txt-tcp6://[2001:db8::2]:11010"
);
}
#[test]
fn tunnel_info_display_tunnel_type_uses_remote_addr_fallback() {
let tunnel = TunnelInfo {
tunnel_type: "tcp".to_string(),
local_addr: None,
remote_addr: Some(Url {
url: "tcp://[2001:db8::2]:11010".to_string(),
}),
resolved_remote_addr: None,
};
assert_eq!(tunnel.display_tunnel_type(), "tcp6");
assert_eq!(
tunnel.display_remote_addr().as_deref(),
Some("tcp6://[2001:db8::2]:11010")
);
}
#[test]
fn tunnel_info_prefers_resolved_remote_addr() {
let tunnel = TunnelInfo {
tunnel_type: "txt-tcp".to_string(),
local_addr: None,
remote_addr: Some(Url {
url: "txt://et.example.com".to_string(),
}),
resolved_remote_addr: Some(Url {
url: "tcp://[2001:db8::3]:11010".to_string(),
}),
};
assert_eq!(tunnel.display_tunnel_type(), "txt-tcp6");
assert_eq!(
tunnel.display_remote_addr().as_deref(),
Some("tcp6://[2001:db8::3]:11010")
);
assert_eq!(
tunnel.effective_remote_addr().map(|url| url.url.as_str()),
Some("tcp://[2001:db8::3]:11010")
);
}
}
+11
View File
@@ -249,6 +249,13 @@ impl TunnelListener for FakeTcpTunnelListener {
)
.into(),
),
resolved_remote_addr: Some(
crate::tunnel::build_url_from_socket_addr(
&socket.remote_addr().to_string(),
"faketcp",
)
.into(),
),
};
// We treat the fake tcp socket as a datagram tunnel directly
@@ -366,6 +373,10 @@ impl TunnelConnector for FakeTcpTunnelConnector {
.into(),
),
remote_addr: Some(self.addr.clone().into()),
resolved_remote_addr: Some(
crate::tunnel::build_url_from_socket_addr(&remote_addr.to_string(), "faketcp")
.into(),
),
};
let socket = Arc::new(socket);
+2 -2
View File
@@ -11,7 +11,7 @@ use derive_more::{From, TryInto};
use futures::{Sink, Stream};
use socket2::Protocol;
use std::fmt::Debug;
use strum::{Display, EnumString, VariantArray};
use strum::{Display, EnumString, IntoStaticStr, VariantArray};
use tokio::time::error::Elapsed;
use self::packet_def::ZCPacket;
@@ -284,7 +284,7 @@ struct IpSchemeAttributes {
port_offset: u16,
}
#[derive(Debug, Clone, Copy, PartialEq, Display, EnumString, VariantArray)]
#[derive(Debug, Clone, Copy, PartialEq, Display, EnumString, IntoStaticStr, VariantArray)]
#[strum(serialize_all = "lowercase")]
pub enum IpScheme {
Tcp,
+7
View File
@@ -175,6 +175,9 @@ impl QuicTunnelListener {
remote_addr: Some(
super::build_url_from_socket_addr(&remote_addr.to_string(), "quic").into(),
),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&remote_addr.to_string(), "quic").into(),
),
};
Ok(Box::new(TunnelWrapper::new(
@@ -280,6 +283,10 @@ impl TunnelConnector for QuicTunnelConnector {
super::build_url_from_socket_addr(&local_addr.to_string(), "quic").into(),
),
remote_addr: Some(self.addr.clone().into()),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&connection.remote_address().to_string(), "quic")
.into(),
),
};
let arc_conn = Arc::new(ConnWrapper { conn: connection });
+6
View File
@@ -214,6 +214,9 @@ fn get_tunnel_for_client(conn: Arc<Connection>) -> impl Tunnel {
tunnel_type: "ring".to_owned(),
local_addr: Some(build_url_from_socket_addr(&conn.client.id.into(), "ring").into()),
remote_addr: Some(build_url_from_socket_addr(&conn.server.id.into(), "ring").into()),
resolved_remote_addr: Some(
build_url_from_socket_addr(&conn.server.id.into(), "ring").into(),
),
}),
)
}
@@ -226,6 +229,9 @@ fn get_tunnel_for_server(conn: Arc<Connection>) -> impl Tunnel {
tunnel_type: "ring".to_owned(),
local_addr: Some(build_url_from_socket_addr(&conn.server.id.into(), "ring").into()),
remote_addr: Some(build_url_from_socket_addr(&conn.client.id.into(), "ring").into()),
resolved_remote_addr: Some(
build_url_from_socket_addr(&conn.client.id.into(), "ring").into(),
),
}),
)
}
+34
View File
@@ -41,6 +41,9 @@ impl TcpTunnelListener {
remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
@@ -117,6 +120,9 @@ fn get_tunnel_with_tcp_stream(
super::build_url_from_socket_addr(&stream.local_addr()?.to_string(), "tcp").into(),
),
remote_addr: Some(remote_url.into()),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
@@ -277,6 +283,34 @@ mod tests {
_tunnel_pingpong(listener, connector).await;
}
#[tokio::test]
async fn connector_keeps_source_addr_and_reports_resolved_addr() {
let mut listener = TcpTunnelListener::new("tcp://127.0.0.1:0".parse().unwrap());
listener.listen().await.unwrap();
let port = listener.local_url().port().unwrap();
let source_url: url::Url = format!("tcp://localhost:{port}").parse().unwrap();
let mut connector = TcpTunnelConnector::new(source_url.clone());
connector.set_ip_version(IpVersion::V4);
let accept_task = tokio::spawn(async move { listener.accept().await.unwrap() });
let tunnel = connector.connect().await.unwrap();
let accepted_tunnel = accept_task.await.unwrap();
let info = tunnel.info().unwrap();
assert_eq!(info.remote_addr.unwrap().url, source_url.to_string());
let resolved_remote_addr: url::Url = info.resolved_remote_addr.unwrap().into();
assert_eq!(resolved_remote_addr.host_str(), Some("127.0.0.1"));
assert_eq!(resolved_remote_addr.port(), Some(port));
let accepted_info = accepted_tunnel.info().unwrap();
assert_eq!(
accepted_info.remote_addr,
accepted_info.resolved_remote_addr,
);
}
#[tokio::test]
async fn test_alloc_port() {
// v4
+6
View File
@@ -428,6 +428,9 @@ impl UdpTunnelListenerData {
remote_addr: Some(
build_url_from_socket_addr(&remote_addr.to_string(), "udp").into(),
),
resolved_remote_addr: Some(
build_url_from_socket_addr(&remote_addr.to_string(), "udp").into(),
),
}),
));
@@ -772,6 +775,9 @@ impl UdpTunnelConnector {
build_url_from_socket_addr(&socket.local_addr()?.to_string(), "udp").into(),
),
remote_addr: Some(self.addr.clone().into()),
resolved_remote_addr: Some(
build_url_from_socket_addr(&dst_addr.to_string(), "udp").into(),
),
}),
)))
}
+3 -1
View File
@@ -43,7 +43,8 @@ impl UnixSocketTunnelListener {
let info = TunnelInfo {
tunnel_type: "unix".to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: remote_addr.map(Into::into),
remote_addr: remote_addr.clone().map(Into::into),
resolved_remote_addr: remote_addr.map(Into::into),
};
let (r, w) = stream.into_split();
@@ -122,6 +123,7 @@ impl super::TunnelConnector for UnixSocketTunnelConnector {
tunnel_type: "unix".to_owned(),
local_addr: local_addr.map(Into::into),
remote_addr: Some(self.addr.clone().into()),
resolved_remote_addr: Some(self.addr.clone().into()),
};
let (r, w) = stream.into_split();
+6 -1
View File
@@ -143,11 +143,13 @@ impl WsTunnelListener {
}
let (write, read) = stream.split();
let remote_addr: crate::proto::common::Url = remote_addr.into();
let info = TunnelInfo {
tunnel_type: self.addr.scheme().to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: Some(remote_addr.into()),
remote_addr: Some(remote_addr.clone()),
resolved_remote_addr: Some(remote_addr),
};
Ok(Box::new(TunnelWrapper::new(
@@ -235,6 +237,9 @@ impl WsTunnelConnector {
.into(),
),
remote_addr: Some(addr.clone().into()),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&socket_addr.to_string(), addr.scheme()).into(),
),
};
let c = ClientBuilder::from_uri(http::Uri::try_from(addr.to_string()).unwrap());
+6
View File
@@ -538,6 +538,9 @@ impl WgTunnelListener {
remote_addr: Some(
build_url_from_socket_addr(&addr.to_string(), "wg").into(),
),
resolved_remote_addr: Some(
build_url_from_socket_addr(&addr.to_string(), "wg").into(),
),
}),
));
if let Err(e) = conn_sender.send(tunnel) {
@@ -685,6 +688,9 @@ impl WgTunnelConnector {
tunnel_type: "wg".to_owned(),
local_addr: Some(super::build_url_from_socket_addr(&local_addr, "wg").into()),
remote_addr: Some(addr_url.into()),
resolved_remote_addr: Some(
super::build_url_from_socket_addr(&addr.to_string(), "wg").into(),
),
}),
Some(Box::new(wg_peer)),
));