diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 53df114f..34fbc167 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -239,6 +239,16 @@ jobs: mv ./target/$TARGET/release/easytier-web ./target/$TARGET/release/easytier-web-embed cargo build --release --verbose --target $TARGET --features=mimalloc + mkdir -p built-bins/$TARGET/release/ + mv ./target/$TARGET/release/easytier-web-embed ./built-bins/$TARGET/release/easytier-web-embed + mv ./target/$TARGET/release/easytier-web ./built-bins/$TARGET/release/easytier-web + mv ./target/$TARGET/release/easytier-core ./built-bins/$TARGET/release/easytier-core + mv ./target/$TARGET/release/easytier-cli ./built-bins/$TARGET/release/easytier-cli + + # remove dirs to avoid copy many files back + rm -rf ./target ~/.cargo + mv ./built-bins ./target + - name: Compress run: | mkdir -p ./artifacts/objects/ diff --git a/Cargo.lock b/Cargo.lock index b18b56c9..a094f74a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -809,14 +809,14 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.71.1" +version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ "bitflags 2.8.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "regex", @@ -2172,7 +2172,7 @@ dependencies = [ "dbus", "defguard_wireguard_rs", "derive_builder", - "easytier-rpc-build 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "easytier-rpc-build", "encoding", "flume 0.12.0", "futures", @@ -2339,16 +2339,6 @@ dependencies = [ "prost-build", ] -[[package]] -name = "easytier-rpc-build" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24829168c28f6a448f57d18116c255dcbd2b8c25e76dbc60f6cd16d68ad2cf07" -dependencies = [ - "heck 0.5.0", - "prost-build", -] - [[package]] name = "easytier-uptime" version = "0.1.0" @@ -4322,11 +4312,11 @@ dependencies = [ [[package]] name = "kcp-sys" version = "0.1.0" -source = "git+https://github.com/EasyTier/kcp-sys?rev=71eff18c573a4a71bf99c7fabc6a8b9f211c84c1#71eff18c573a4a71bf99c7fabc6a8b9f211c84c1" +source = "git+https://github.com/EasyTier/kcp-sys?rev=94964794caaed5d388463137da59b97499619e5f#94964794caaed5d388463137da59b97499619e5f" dependencies = [ "anyhow", "auto_impl", - "bindgen 0.71.1", + "bindgen 0.72.1", "bitflags 2.8.0", "bytes", "cc", diff --git a/easytier-rpc-build/src/lib.rs b/easytier-rpc-build/src/lib.rs index 3d24dadb..d2162578 100644 --- a/easytier-rpc-build/src/lib.rs +++ b/easytier-rpc-build/src/lib.rs @@ -29,6 +29,7 @@ impl prost_build::ServiceGenerator for ServiceGenerator { let method_descriptor_name = format!("{}MethodDescriptor", service.name); let mut trait_methods = String::new(); + let mut weak_impl_methods = String::new(); let mut enum_methods = String::new(); let mut list_enum_methods = String::new(); let mut client_methods = String::new(); @@ -66,6 +67,21 @@ impl prost_build::ServiceGenerator for ServiceGenerator { ) .unwrap(); + writeln!( + weak_impl_methods, + r#" async fn {method_name}(&self, ctrl: Self::Controller, input: {input_type}) -> {namespace}::error::Result<{output_type}> {{ + let Some(service) = self.upgrade() else {{ + return Err({namespace}::error::Error::Shutdown); + }}; + service.{method_name}(ctrl, input).await + }}"#, + method_name = method.name, + input_type = method.input_type, + output_type = method.output_type, + namespace = NAMESPACE, + ) + .unwrap(); + ServiceGenerator::write_comments(&mut enum_methods, 4, &method.comments).unwrap(); writeln!( enum_methods, @@ -178,6 +194,17 @@ pub trait {name} {{ {trait_methods} }} +#[async_trait::async_trait] +impl {name} for ::std::sync::Weak +where + T: Send + Sync + 'static, + ::std::sync::Arc: {name}, +{{ + type Controller = <::std::sync::Arc as {name}>::Controller; + + {weak_impl_methods} +}} + /// A service descriptor for a `{name}`. #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Default)] pub struct {descriptor_name}; @@ -250,6 +277,16 @@ impl {namespace}::__rt::RpcClientFactory for {client_name}Factory where C: #[derive(Clone, Debug)] pub struct {server_name}(A) where A: {name} + Clone + Send + 'static; +impl {server_name}<::std::sync::Weak> +where + T: Send + Sync + 'static, + ::std::sync::Arc: {name}, +{{ + pub fn new_arc(service: ::std::sync::Arc) -> {server_name}<::std::sync::Weak> {{ + {server_name}(::std::sync::Arc::downgrade(&service)) + }} +}} + impl {server_name} where A: {name} + Clone + Send + 'static {{ /// Creates a new server instance that dispatches all calls to the supplied service. pub fn new(service: A) -> {server_name} {{ @@ -345,6 +382,7 @@ impl {namespace}::descriptor::MethodDescriptor for {method_descriptor_name} {{ proto_name = service.proto_name, package = service.package, trait_methods = trait_methods, + weak_impl_methods = weak_impl_methods, enum_methods = enum_methods, list_enum_methods = list_enum_methods, client_own_methods = client_own_methods, diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 7f635dad..78609387 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -286,9 +286,7 @@ tonic-build = "0.12" globwalk = "0.8.1" regex = "1" prost-build = "0.13.2" -rpc_build = { package = "easytier-rpc-build", version = "0.1.0", features = [ - "internal-namespace", -] } +easytier-rpc-build = { path = "../easytier-rpc-build", features = [ "internal-namespace" ] } prost-reflect-build = { version = "0.14.0" } [target.'cfg(windows)'.build-dependencies] diff --git a/easytier/build.rs b/easytier/build.rs index bd4e3c00..363a70ef 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -183,7 +183,7 @@ fn main() -> Result<(), Box> { ) .type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]") .field_attribute(".api.manage.NetworkConfig", "#[serde(default)]") - .service_generator(Box::new(rpc_build::ServiceGenerator::new())) + .service_generator(Box::new(easytier_rpc_build::ServiceGenerator::default())) .btree_map(["."]) .skip_debug([".common.Ipv4Addr", ".common.Ipv6Addr", ".common.UUID"]); diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 91fea7dc..840b0c84 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -1,5 +1,5 @@ use std::collections::hash_map::DefaultHasher; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; use std::{ hash::Hasher, sync::{Arc, Mutex}, @@ -257,6 +257,13 @@ impl GlobalCtx { } } + pub fn is_ip_local_virtual_ip(&self, ip: &IpAddr) -> bool { + match ip { + IpAddr::V4(v4) => self.get_ipv4().map(|x| x.address() == *v4).unwrap_or(false), + IpAddr::V6(v6) => self.get_ipv6().map(|x| x.address() == *v6).unwrap_or(false), + } + } + pub fn get_network_identity(&self) -> NetworkIdentity { self.config.get_network_identity() } @@ -303,18 +310,6 @@ impl GlobalCtx { } } - pub fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool { - let check_proto = |listener_proto: &str| { - let listener_is_udp = matches!(listener_proto, "udp" | "wg"); - listener_is_udp == is_udp - }; - self.running_listeners - .lock() - .unwrap() - .iter() - .any(|x| x.port() == Some(port) && check_proto(x.scheme())) - } - pub fn get_vpn_portal_cidr(&self) -> Option { self.config.get_vpn_portal_config().map(|x| x.client_cidr) } @@ -447,6 +442,46 @@ impl GlobalCtx { // NOTICE: p2p only is conflict with latency first self.config.get_flags().latency_first && !self.p2p_only } + + fn is_port_in_running_listeners(&self, port: u16, is_udp: bool) -> bool { + let check_proto = |listener_proto: &str| { + let listener_is_udp = matches!(listener_proto, "udp" | "wg"); + listener_is_udp == is_udp + }; + self.running_listeners + .lock() + .unwrap() + .iter() + .any(|x| x.port() == Some(port) && check_proto(x.scheme())) + } + + #[tracing::instrument(ret, skip(self))] + pub fn should_deny_proxy(&self, dst_addr: &SocketAddr, is_udp: bool) -> bool { + let _g = self.net_ns.guard(); + let ip = dst_addr.ip(); + // first check if ip is virtual ip + // then try bind this ip, if succ means it is local ip + let dst_is_local_virtual_ip = self.is_ip_local_virtual_ip(&ip); + // this is an expensive operation, should be called sparingly + // 1. tcp/kcp/quic call this only after proxy conn is established + // 2. udp cache the result in nat entry + let dst_is_local_phy_ip = std::net::UdpSocket::bind(format!("{}:0", ip)).is_ok(); + + tracing::trace!( + "check should_deny_proxy: dst_addr={}, dst_is_local_virtual_ip={}, dst_is_local_phy_ip={}, is_udp={}", + dst_addr, + dst_is_local_virtual_ip, + dst_is_local_phy_ip, + is_udp + ); + + if dst_is_local_virtual_ip || dst_is_local_phy_ip { + // if is local ip, make sure the port is not one of the listening ports + self.is_port_in_running_listeners(dst_addr.port(), is_udp) + } else { + false + } + } } #[cfg(test)] diff --git a/easytier/src/common/netns.rs b/easytier/src/common/netns.rs index 27f5289f..95d6b3f7 100644 --- a/easytier/src/common/netns.rs +++ b/easytier/src/common/netns.rs @@ -74,7 +74,7 @@ impl NetNSGuard { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct NetNS { name: Option, } diff --git a/easytier/src/common/stun.rs b/easytier/src/common/stun.rs index 4201fe13..2829f282 100644 --- a/easytier/src/common/stun.rs +++ b/easytier/src/common/stun.rs @@ -1321,7 +1321,10 @@ impl StunInfoCollectorTrait for MockStunInfoCollector { #[cfg(test)] mod tests { - use crate::tunnel::{udp::UdpTunnelListener, TunnelListener}; + use crate::{ + common::scoped_task::ScopedTask, + tunnel::{udp::UdpTunnelListener, TunnelListener}, + }; use super::*; @@ -1406,11 +1409,11 @@ mod tests { use stun_codec::rfc5389::attributes::XorMappedAddress; use tokio::net::TcpListener; - async fn spawn_tcp_stun_server() -> SocketAddr { + async fn spawn_tcp_stun_server() -> (SocketAddr, ScopedTask<()>) { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let server_addr = listener.local_addr().unwrap(); - tokio::spawn(async move { + let task = tokio::spawn(async move { let (mut stream, peer_addr) = listener.accept().await.unwrap(); let req = TcpStunClient::tcp_read_stun_message(&mut stream, Duration::from_secs(2)) @@ -1430,11 +1433,11 @@ mod tests { stream.write_all(rsp_buf.as_slice()).await.unwrap(); }); - server_addr + (server_addr, task.into()) } - let server1 = spawn_tcp_stun_server().await; - let server2 = spawn_tcp_stun_server().await; + let (server1, _t1) = spawn_tcp_stun_server().await; + let (server2, _t2) = spawn_tcp_stun_server().await; let stun_servers = vec![server1.to_string(), server2.to_string()]; let detector = TcpNatTypeDetector::new(stun_servers, 1); @@ -1469,7 +1472,7 @@ mod tests { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let server_addr = listener.local_addr().unwrap(); - tokio::spawn(async move { + let _t = ScopedTask::from(tokio::spawn(async move { for _ in 0..8 { let Ok((mut stream, peer_addr)) = listener.accept().await else { break; @@ -1491,7 +1494,7 @@ mod tests { let rsp_buf = encoder.encode_into_bytes(resp_msg).unwrap(); stream.write_all(rsp_buf.as_slice()).await.unwrap(); } - }); + })); let collector = StunInfoCollector::new(vec![], vec![server_addr.to_string()], vec![]); collector.set_tcp_stun_servers(vec![server_addr.to_string()]); diff --git a/easytier/src/connector/tcp_hole_punch.rs b/easytier/src/connector/tcp_hole_punch.rs index e6373f9d..efe30511 100644 --- a/easytier/src/connector/tcp_hole_punch.rs +++ b/easytier/src/connector/tcp_hole_punch.rs @@ -538,7 +538,7 @@ impl TcpHolePunchConnector { .rpc_server() .registry() .register( - TcpHolePunchRpcServer::new(self.server.clone()), + TcpHolePunchRpcServer::new_arc(self.server.clone()), &self.peer_mgr.get_global_ctx().get_network_name(), ); Ok(()) diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index 2238b73b..b8c6249a 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -547,7 +547,7 @@ impl UdpHolePunchConnector { .rpc_server() .registry() .register( - UdpHolePunchRpcServer::new(self.server.clone()), + UdpHolePunchRpcServer::new(Arc::downgrade(&self.server)), &self.peer_mgr.get_global_ctx().get_network_name(), ); diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index 21c51da8..fda62d85 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -641,6 +641,8 @@ pub mod tests { #[tokio::test] #[serial_test::serial(hole_punch)] async fn hole_punching_symmetric_only_predict(#[values("true", "false")] is_inc: bool) { + use crate::common::scoped_task::ScopedTask; + RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed); let p_a = create_mock_peer_manager_with_mock_stun(if is_inc { @@ -689,10 +691,12 @@ pub mod tests { let counter = Arc::new(AtomicU32::new(0)); + let mut tasks: Vec> = vec![]; + // all these sockets should receive hole punching packet for udp in udps.iter().map(Arc::clone) { let counter = counter.clone(); - tokio::spawn(async move { + tasks.push(ScopedTask::from(tokio::spawn(async move { let mut buf = [0u8; 1024]; let (len, addr) = udp.recv_from(&mut buf).await.unwrap(); println!( @@ -702,7 +706,7 @@ pub mod tests { udp.local_addr() ); counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - }); + }))); } hole_punching_a.client.run_immediately().await; diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 2fb51056..a5c8bfe9 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -230,8 +230,10 @@ impl TcpProxyForKcpSrcTrait for TcpProxyForKcpSrc { } async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool { - self.0 - .get_peer_manager() + let Some(peer_manager) = self.0.get_peer_manager() else { + return false; + }; + peer_manager .check_allow_kcp_to_dst(&IpAddr::V4(*dst_ip)) .await } @@ -503,19 +505,16 @@ impl KcpProxyDst { route.get_peer_groups_by_ip(&dst_ip) ); - let send_to_self = - Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())); + if global_ctx.should_deny_proxy(&dst_socket, false) { + return Err(anyhow::anyhow!( + "dst socket {:?} is in running listeners, ignore it", + dst_socket + ) + .into()); + } + let send_to_self = global_ctx.is_ip_local_virtual_ip(&dst_ip); if send_to_self && global_ctx.no_tun() { - if global_ctx.is_port_in_running_listeners(dst_socket.port(), false) - && global_ctx.is_ip_in_same_network(&src_ip) - { - return Err(anyhow::anyhow!( - "dst socket {:?} is in running listeners, ignore it", - dst_socket - ) - .into()); - } dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); } diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index 7a383684..bc7cbeae 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -192,8 +192,10 @@ impl TcpProxyForKcpSrcTrait for TcpProxyForQUICSrc { } async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool { - let peer_map: Arc = - self.0.get_peer_manager().get_peer_map(); + let Some(peer_manager) = self.0.get_peer_manager() else { + return false; + }; + let peer_map: Arc = peer_manager.get_peer_map(); let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else { return false; }; @@ -414,17 +416,16 @@ impl QUICProxyDst { route.get_peer_groups_by_ipv4(&dst_ip) ); - let send_to_self = Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()); + if ctx.should_deny_proxy(&dst_socket.into(), false) { + return Err(anyhow::anyhow!( + "dst socket {:?} is in running listeners, ignore it", + dst_socket + ) + .into()); + } + + let send_to_self = ctx.is_ip_local_virtual_ip(&dst_ip.into()); if send_to_self && ctx.no_tun() { - if ctx.is_port_in_running_listeners(dst_socket.port(), false) - && ctx.is_ip_in_same_network(&src_ip) - { - return Err(anyhow::anyhow!( - "dst socket {:?} is in running listeners, ignore it", - dst_socket - ) - .into()); - } dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); } diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index ec835372..2b1a48dc 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -233,7 +233,6 @@ impl AsyncTcpConnector for Socks5KcpConnector { kcp_endpoint, peer_mgr: self.peer_mgr.clone(), }; - println!("connect to kcp endpoint, addr = {:?}", addr); let ret = c .connect(self.src_addr, addr) .await @@ -355,7 +354,7 @@ impl Socks5ServerNet { pub fn new( ipv4_addr: cidr::Ipv4Inet, auth: Option, - peer_manager: Arc, + peer_manager: Weak, packet_recv: Arc>>, entries: Socks5EntrySet, ) -> Self { @@ -390,6 +389,10 @@ impl Socks5ServerNet { let dst = ipv4.get_destination(); let packet = ZCPacket::new_with_payload(&data); + let Some(peer_manager) = peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, smoltcp sender exited"); + return; + }; if let Err(e) = peer_manager .send_msg_by_ip(packet, IpAddr::V4(dst), false) .await @@ -474,7 +477,7 @@ struct UdpClientKey { pub struct Socks5Server { global_ctx: Arc, - peer_manager: Arc, + peer_manager: Weak, auth: Option, tasks: Arc>>, @@ -587,7 +590,7 @@ impl Socks5Server { let (packet_sender, packet_recv) = mpsc::channel(1024); Arc::new(Self { global_ctx, - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), auth, tasks: Arc::new(std::sync::Mutex::new(JoinSet::new())), @@ -675,7 +678,7 @@ impl Socks5Server { )?; let entries = self.entries.clone(); - let peer_manager = Arc::downgrade(&self.peer_manager); + let peer_manager = self.peer_manager.clone(); let net = self.net.clone(); self.tasks.lock().unwrap().spawn(async move { loop { @@ -714,7 +717,10 @@ impl Socks5Server { let cfgs = self.global_ctx.config.get_port_forwards(); self.reload_port_forwards(&cfgs).await?; - self.peer_manager + let Some(peer_manager) = self.peer_manager.upgrade() else { + return Err(anyhow::anyhow!("peer manager is gone").into()); + }; + peer_manager .add_packet_process_pipeline(Box::new(self.clone())) .await; @@ -806,7 +812,7 @@ impl Socks5Server { join_joinset_background(tasks.clone(), "tcp port forward".to_string()); let forward_tasks = tasks; let kcp_endpoint = self.kcp_endpoint.lock().await.clone(); - let peer_mgr = Arc::downgrade(&self.peer_manager.clone()); + let peer_mgr = self.peer_manager.clone(); let cancel_token = CancellationToken::new(); self.cancel_tokens .insert(cfg.clone(), cancel_token.clone().drop_guard()); diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 9544f7d1..b6a5c731 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -316,7 +316,7 @@ type AddrConnSockMap = Arc>; #[derive(Debug)] pub struct TcpProxy { global_ctx: Arc, - peer_manager: Arc, + peer_manager: Weak, local_port: AtomicU16, tasks: Arc>>, @@ -346,8 +346,10 @@ impl PeerPacketFilter for TcpProxy { if let Err(e) = smoltcp_stack_sender.try_send(packet) { tracing::error!("send to smoltcp stack failed: {:?}", e); } - } else if let Err(e) = self.peer_manager.get_nic_channel().send(packet).await { - tracing::error!("send to nic failed: {:?}", e); + } else if let Some(peer_manager) = self.get_peer_manager() { + if let Err(e) = peer_manager.get_nic_channel().send(packet).await { + tracing::error!("send to nic failed: {:?}", e); + } } return None; } else { @@ -443,7 +445,7 @@ impl TcpProxy { Arc::new(Self { global_ctx: global_ctx.clone(), - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), local_port: AtomicU16::new(0), tasks: Arc::new(std::sync::Mutex::new(JoinSet::new())), @@ -467,6 +469,10 @@ impl TcpProxy { }) } + pub fn get_peer_manager(&self) -> Option> { + self.peer_manager.upgrade() + } + fn update_tcp_packet_checksum( tcp_packet: &mut MutableTcpPacket, ipv4_src: &Ipv4Addr, @@ -487,10 +493,13 @@ impl TcpProxy { self.run_syn_map_cleaner().await?; self.run_listener().await?; if add_pipeline { - self.peer_manager + let peer_manager = self + .get_peer_manager() + .ok_or_else(|| anyhow::anyhow!("peer manager is gone"))?; + peer_manager .add_packet_process_pipeline(Box::new(self.clone())) .await; - self.peer_manager + peer_manager .add_nic_packet_process_pipeline(Box::new(self.clone())) .await; } @@ -569,6 +578,10 @@ impl TcpProxy { let dst = ipv4.get_destination(); let packet = ZCPacket::new_with_payload(&data); + let Some(peer_mgr) = peer_mgr.upgrade() else { + tracing::warn!("peer manager is gone, smoltcp sender exited"); + return; + }; if let Err(e) = peer_mgr .send_msg_by_ip(packet, IpAddr::V4(dst), false) .await @@ -734,21 +747,18 @@ impl TcpProxy { tracing::warn!("set_nodelay failed, ignore it: {:?}", e); } - let nat_dst = if Some(nat_entry.real_dst.ip()) - == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) - { - if global_ctx.is_port_in_running_listeners(nat_entry.real_dst.port(), false) - && global_ctx.is_ip_in_same_network(&nat_entry.src.ip()) - { - tracing::error!( - ?nat_entry, - "nat dst port {} is in running listeners, ignore it", - nat_entry.real_dst.port() - ); - nat_entry.state.store(NatDstEntryState::Closed); - Self::remove_entry_from_all_conn_map(conn_map, addr_conn_map, nat_entry); - return; - } + if global_ctx.should_deny_proxy(&nat_entry.real_dst, false) { + tracing::error!( + ?nat_entry, + "nat dst port {} is in running listeners, ignore it", + nat_entry.real_dst.port() + ); + nat_entry.state.store(NatDstEntryState::Closed); + Self::remove_entry_from_all_conn_map(conn_map, addr_conn_map, nat_entry); + return; + } + + let nat_dst = if global_ctx.is_ip_local_virtual_ip(&nat_entry.real_dst.ip()) { format!("127.0.0.1:{}", nat_entry.real_dst.port()) .parse() .unwrap() @@ -831,7 +841,10 @@ impl TcpProxy { } pub fn get_my_peer_id(&self) -> u32 { - self.peer_manager.my_peer_id() + self.peer_manager + .upgrade() + .map(|pm| pm.my_peer_id()) + .unwrap_or_default() } pub fn get_local_ip(&self) -> Option { @@ -952,10 +965,6 @@ impl TcpProxy { Some(()) } - pub fn get_peer_manager(&self) -> &Arc { - &self.peer_manager - } - pub fn is_tcp_proxy_connection(&self, src: SocketAddr) -> bool { self.syn_map.contains_key(&src) || self.addr_conn_map.contains_key(&src) } diff --git a/easytier/src/gateway/tokio_smoltcp/mod.rs b/easytier/src/gateway/tokio_smoltcp/mod.rs index 4a162ff4..011edc14 100644 --- a/easytier/src/gateway/tokio_smoltcp/mod.rs +++ b/easytier/src/gateway/tokio_smoltcp/mod.rs @@ -12,7 +12,6 @@ use std::{ }; use device::BufferDevice; -use futures::Future; use reactor::Reactor; pub use smoltcp; use smoltcp::{ @@ -24,6 +23,8 @@ pub use socket::{TcpListener, TcpStream, UdpSocket}; pub use socket_allocator::BufferSize; use tokio::sync::Notify; +use crate::common::scoped_task::ScopedTask; + /// The async devices. pub mod channel_device; pub mod device; @@ -78,6 +79,7 @@ pub struct Net { ip_addr: IpCidr, from_port: AtomicU16, stopper: Arc, + fut: ScopedTask>, } impl std::fmt::Debug for Net { @@ -92,15 +94,10 @@ impl std::fmt::Debug for Net { impl Net { /// Creates a new `Net` instance. It panics if the medium is not supported. pub fn new(device: D, config: NetConfig) -> Net { - let (net, fut) = Self::new2(device, config); - tokio::spawn(fut); - net + Self::new2(device, config) } - fn new2( - device: D, - config: NetConfig, - ) -> (Net, impl Future> + Send) { + fn new2(device: D, config: NetConfig) -> Net { let mut buffer_device = BufferDevice::new(device.capabilities().clone()); let mut iface = Interface::new(config.interface_config, &mut buffer_device, Instant::now()); let ip_addr = config.ip_addr; @@ -129,15 +126,13 @@ impl Net { stopper.clone(), ); - ( - Net { - reactor: Arc::new(reactor), - ip_addr: config.ip_addr, - from_port: AtomicU16::new(10001), - stopper, - }, - fut, - ) + Net { + reactor: Arc::new(reactor), + ip_addr: config.ip_addr, + from_port: AtomicU16::new(10001), + stopper, + fut: ScopedTask::from(tokio::spawn(fut)), + } } pub fn get_address(&self) -> IpAddr { self.ip_addr.address().into() diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index 6bdce796..1f5eaaaf 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -1,6 +1,6 @@ use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, Arc, Weak}, time::Duration, }; @@ -46,25 +46,35 @@ struct UdpNatEntry { src_peer_id: PeerId, my_peer_id: PeerId, src_socket: SocketAddr, - socket: UdpSocket, + socket: Option, forward_task: Mutex>>, stopped: AtomicBool, start_time: std::time::Instant, last_active_time: AtomicCell, + denied: bool, } impl UdpNatEntry { #[tracing::instrument(err(level = Level::WARN))] - fn new(src_peer_id: PeerId, my_peer_id: PeerId, src_socket: SocketAddr) -> Result { + fn new( + src_peer_id: PeerId, + my_peer_id: PeerId, + src_socket: SocketAddr, + denied: bool, + ) -> Result { // TODO: try use src port, so we will be ip restricted nat type - let socket2_socket = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - let dst_socket_addr = "0.0.0.0:0".parse().unwrap(); - setup_sokcet2(&socket2_socket, &dst_socket_addr)?; - let socket = UdpSocket::from_std(socket2_socket.into())?; + let socket = if denied { + None + } else { + let socket2_socket = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + let dst_socket_addr = "0.0.0.0:0".parse().unwrap(); + setup_sokcet2(&socket2_socket, &dst_socket_addr)?; + Some(UdpSocket::from_std(socket2_socket.into())?) + }; Ok(Self { src_peer_id, @@ -75,6 +85,7 @@ impl UdpNatEntry { stopped: AtomicBool::new(false), start_time: std::time::Instant::now(), last_active_time: AtomicCell::new(std::time::Instant::now()), + denied, }) } @@ -165,7 +176,11 @@ impl UdpNatEntry { let (len, src_socket) = match timeout( Duration::from_secs(120), - self_clone.socket.recv_buf_from(&mut cur_buf), + self_clone + .socket + .as_ref() + .unwrap() + .recv_buf_from(&mut cur_buf), ) .await { @@ -239,7 +254,7 @@ impl UdpNatEntry { #[derive(Debug)] pub struct UdpProxy { global_ctx: ArcGlobalCtx, - peer_manager: Arc, + peer_manager: Weak, cidr_set: CidrSet, @@ -299,22 +314,7 @@ impl UdpProxy { }; // TODO: should it be async. - let dst_socket = if Some(ipv4.get_destination()) - == self.global_ctx.get_ipv4().as_ref().map(Ipv4Inet::address) - { - if self - .global_ctx - .is_port_in_running_listeners(udp_packet.get_destination(), true) - && self - .global_ctx - .is_ip_in_same_network(&std::net::IpAddr::V4(ipv4.get_source())) - { - tracing::debug!( - dst_port = udp_packet.get_destination(), - "dst socket is in running listeners, ignore it" - ); - return Some(()); - } + let dst_socket = if self.global_ctx.is_ip_local_virtual_ip(&real_dst_ip.into()) { format!("127.0.0.1:{}", udp_packet.get_destination()) .parse() .unwrap() @@ -337,16 +337,29 @@ impl UdpProxy { .entry(nat_key) .or_try_insert_with::(|| { tracing::info!(?packet, ?ipv4, ?udp_packet, "udp nat table entry created"); + let denied = self.global_ctx.should_deny_proxy( + &SocketAddr::new(real_dst_ip.into(), udp_packet.get_destination()), + true, + ); let _g = self.global_ctx.net_ns.guard(); Ok(Arc::new(UdpNatEntry::new( hdr.from_peer_id.get(), hdr.to_peer_id.get(), nat_key.src_socket, + denied, )?)) }) .ok()? .clone(); + if nat_entry.denied { + tracing::debug!( + dst_port = udp_packet.get_destination(), + "dst socket is in running listeners, ignore it" + ); + return Some(()); + } + if nat_entry.forward_task.lock().await.is_none() { nat_entry .forward_task @@ -367,6 +380,8 @@ impl UdpProxy { let _g = self.global_ctx.net_ns.guard(); nat_entry .socket + .as_ref() + .unwrap() .send_to(udp_packet.payload(), dst_socket) .await }; @@ -405,7 +420,7 @@ impl UdpProxy { let (sender, receiver) = channel(1024); let ret = Self { global_ctx, - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), cidr_set, nat_table: Arc::new(DashMap::new()), sender, @@ -417,7 +432,10 @@ impl UdpProxy { } pub async fn start(self: &Arc) -> Result<(), Error> { - self.peer_manager + let Some(peer_manager) = self.peer_manager.upgrade() else { + return Err(anyhow::anyhow!("peer manager is gone").into()); + }; + peer_manager .add_packet_process_pipeline(Box::new(self.clone())) .await; @@ -457,7 +475,11 @@ impl UdpProxy { hdr.set_latency_first(is_latency_first); let to_peer_id = hdr.to_peer_id.into(); tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send"); - let ret = peer_manager.send_msg_for_proxy(msg, to_peer_id).await; + let Some(pm) = peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, udp proxy send loop exit"); + return; + }; + let ret = pm.send_msg_for_proxy(msg, to_peer_id).await; if ret.is_err() { tracing::error!("send icmp packet to peer failed: {:?}", ret); } diff --git a/easytier/src/instance/dns_server/server_instance.rs b/easytier/src/instance/dns_server/server_instance.rs index b6eaf8aa..eab04695 100644 --- a/easytier/src/instance/dns_server/server_instance.rs +++ b/easytier/src/instance/dns_server/server_instance.rs @@ -518,7 +518,7 @@ impl MagicDnsServerInstance { rpc_server .registry() - .register(MagicDnsServerRpcServer::new(data.clone()), ""); + .register(MagicDnsServerRpcServer::new_arc(data.clone()), ""); rpc_server.set_hook(data.clone()); peer_mgr diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index be9c17bc..97771a73 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -725,8 +725,8 @@ impl PeerManager { let mut processed = false; let mut zc_packet = Some(ret); - for (idx, pipeline) in pipe_line.read().await.iter().rev().enumerate() { - tracing::trace!(?zc_packet, ?idx, "try_process_packet_from_peer"); + tracing::trace!(?zc_packet, "try_process_packet_from_peer"); + for pipeline in pipe_line.read().await.iter().rev() { zc_packet = pipeline .try_process_packet_from_peer(zc_packet.unwrap()) .await; diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 27d0c04b..bc6b42be 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -1481,10 +1481,60 @@ pub async fn relay_bps_limit_test(#[values(100, 200, 400, 800)] bps_limit: u64) drop_insts(insts).await; } +async fn assert_try_direct_connect_err(inst: &Instance, connector: C) +where + C: crate::tunnel::TunnelConnector + std::fmt::Debug, +{ + let ret = tokio::time::timeout( + Duration::from_millis(100), + inst.get_peer_manager().try_direct_connect(connector), + ) + .await; + + assert!(matches!(ret, Err(_) | Ok(Err(_)))); +} + +use std::fs; +use std::io; + +fn print_all_fds() -> io::Result<()> { + let fd_dir = "/proc/self/fd"; + + // 读取 /proc/self/fd 目录中的所有条目 + for entry in fs::read_dir(fd_dir)? { + let entry = entry?; + let file_name = entry.file_name(); + let fd_str = file_name.to_string_lossy(); + + // 尝试解析为数字(跳过 . 和 ..) + if let Ok(fd_num) = fd_str.parse::() { + // 获取文件描述符指向的文件路径(如果可能) + let target_path = format!("{}/{}", fd_dir, fd_num); + match fs::read_link(&target_path) { + Ok(target) => { + println!("FD {}: {}", fd_num, target.to_string_lossy()); + } + Err(e) => { + println!("FD {}: (unreadable: {})", fd_num, e); + } + } + } + } + Ok(()) +} + #[rstest::rstest] #[serial_test::serial] #[tokio::test] -async fn avoid_tunnel_loop_back_to_virtual_network(#[values(true, false)] no_tun: bool) { +async fn avoid_tunnel_loop_back_to_virtual_network( + #[values(true, false)] no_tun: bool, + #[values(true, false)] enable_kcp_proxy: bool, + #[values(true, false)] enable_quic_proxy: bool, +) { + if enable_kcp_proxy && enable_quic_proxy { + return; + } + let insts = init_three_node_ex( "udp", |cfg| { @@ -1493,27 +1543,52 @@ async fn avoid_tunnel_loop_back_to_virtual_network(#[values(true, false)] no_tun flags.no_tun = no_tun; cfg.set_flags(flags); } + + if cfg.get_inst_name().as_str() == "inst1" { + let mut flags = cfg.get_flags(); + flags.enable_kcp_proxy = enable_kcp_proxy; + flags.enable_quic_proxy = enable_quic_proxy; + cfg.set_flags(flags); + } + + if cfg.get_inst_name().as_str() == "inst3" { + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) + .unwrap(); + } + cfg }, false, ) .await; - let tcp_connector = TcpTunnelConnector::new("tcp://10.144.144.2:11010".parse().unwrap()); - insts[0] - .get_peer_manager() - .try_direct_connect(tcp_connector) - .await - .unwrap_err(); + assert_try_direct_connect_err( + &insts[0], + TcpTunnelConnector::new("tcp://10.144.144.2:11010".parse().unwrap()), + ) + .await; - let udp_connector = UdpTunnelConnector::new("udp://10.144.144.3:11010".parse().unwrap()); - insts[0] - .get_peer_manager() - .try_direct_connect(udp_connector) - .await - .unwrap_err(); + assert_try_direct_connect_err( + &insts[0], + UdpTunnelConnector::new("udp://10.144.144.3:11010".parse().unwrap()), + ) + .await; + + assert_try_direct_connect_err( + &insts[0], + TcpTunnelConnector::new("tcp://10.1.2.3:11010".parse().unwrap()), + ) + .await; + + assert_try_direct_connect_err( + &insts[0], + UdpTunnelConnector::new("udp://10.1.2.3:11010".parse().unwrap()), + ) + .await; drop_insts(insts).await; + + let _ = print_all_fds(); } #[rstest::rstest] diff --git a/easytier/src/utils.rs b/easytier/src/utils.rs index c6254b64..88b88322 100644 --- a/easytier/src/utils.rs +++ b/easytier/src/utils.rs @@ -61,7 +61,6 @@ pub fn init_logger( let _ = CURRENT_LOG_LEVEL.set(std::sync::Mutex::new(file_level.to_string())); std::thread::spawn(move || { - println!("Start log filter reloader"); while let Ok(lf) = recver.recv() { let e = file_filter_reloader.modify(|f| { if let Ok(nf) = EnvFilter::builder()