fix: avoid panic when validating mapped listeners (#2153)

This commit is contained in:
datayurei
2026-04-25 17:45:57 +08:00
committed by GitHub
parent 5a1668c753
commit af6b6ab6f1
6 changed files with 297 additions and 52 deletions
+80 -7
View File
@@ -51,6 +51,19 @@ pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
fn mapped_listener_port(url: &url::Url) -> Option<u16> {
url.port().or_else(|| {
TunnelScheme::try_from(url)
.ok()
.and_then(|scheme| IpScheme::try_from(scheme).ok())
.map(IpScheme::default_port)
})
}
async fn resolve_mapped_listener_addrs(listener: &url::Url) -> Result<Vec<SocketAddr>, Error> {
socket_addrs(listener, || mapped_listener_port(listener)).await
}
#[async_trait::async_trait]
pub trait PeerManagerForDirectConnector {
async fn list_peers(&self) -> Vec<PeerId>;
@@ -132,7 +145,7 @@ impl DirectConnectorManagerData {
}
let global_ctx = self.peer_manager.get_global_ctx();
let listener_port = remote_url.port().ok_or(anyhow::anyhow!(
let listener_port = mapped_listener_port(remote_url).ok_or(anyhow::anyhow!(
"failed to parse port from remote url: {}",
remote_url
))?;
@@ -382,7 +395,7 @@ impl DirectConnectorManagerData {
listener: &url::Url,
tasks: &mut JoinSet<Result<(), Error>>,
) {
let Ok(mut addrs) = socket_addrs(listener, || None).await else {
let Ok(mut addrs) = resolve_mapped_listener_addrs(listener).await else {
tracing::error!(?listener, "failed to parse socket address from listener");
return;
};
@@ -536,7 +549,7 @@ impl DirectConnectorManagerData {
.into_iter()
.map(Into::<url::Url>::into)
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| l.port().is_some() && l.host().is_some())
.filter(|l| mapped_listener_port(l).is_some() && l.host().is_some())
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();
@@ -762,6 +775,17 @@ impl DirectConnectorManager {
pub fn run_as_client(&mut self) {
self.client.start();
}
#[cfg(test)]
pub(crate) async fn try_direct_connect_with_ip_list(
&self,
dst_peer_id: PeerId,
ip_list: GetIpListResponse,
) -> Result<(), Error> {
self.data
.do_try_direct_connect_internal(dst_peer_id, ip_list)
.await
}
}
#[cfg(test)]
@@ -780,10 +804,53 @@ mod tests {
proto::peer_rpc::GetIpListResponse,
};
use super::TESTING;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::{TESTING, mapped_listener_port, resolve_mapped_listener_addrs};
#[test]
fn mapped_listener_port_uses_ip_scheme_defaults() {
assert_eq!(
mapped_listener_port(&"ws://example.com".parse().unwrap()),
Some(80)
);
assert_eq!(
mapped_listener_port(&"wss://example.com".parse().unwrap()),
Some(443)
);
assert_eq!(
mapped_listener_port(&"tcp://127.0.0.1".parse().unwrap()),
Some(11010)
);
assert_eq!(
mapped_listener_port(&"udp://127.0.0.1".parse().unwrap()),
Some(11010)
);
}
#[tokio::test]
async fn direct_connector_mapped_listener() {
async fn resolve_mapped_listener_addrs_uses_default_ports() {
let wss_addrs = resolve_mapped_listener_addrs(&"wss://127.0.0.1".parse().unwrap())
.await
.unwrap();
assert_eq!(
wss_addrs,
vec![SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443)]
);
let tcp_addrs = resolve_mapped_listener_addrs(&"tcp://127.0.0.1".parse().unwrap())
.await
.unwrap();
assert_eq!(
tcp_addrs,
vec![SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 11010)]
);
}
async fn run_direct_connector_mapped_listener_test(
mapped_listener: &str,
target_listener: &str,
) {
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
@@ -802,11 +869,11 @@ mod tests {
p_c.get_global_ctx()
.config
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
.set_mapped_listeners(Some(vec![mapped_listener.parse().unwrap()]));
p_x.get_global_ctx()
.config
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
.set_listeners(vec![target_listener.parse().unwrap()]);
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
lis_x.prepare_listeners().await.unwrap();
lis_x.run().await.unwrap();
@@ -823,6 +890,12 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn direct_connector_mapped_listener() {
run_direct_connector_mapped_listener_test("tcp://127.0.0.1:11334", "tcp://0.0.0.0:11334")
.await;
}
#[rstest::rstest]
#[tokio::test]
async fn direct_connector_basic_test(