diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9ae72286..9e42b837 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -105,15 +105,12 @@ jobs: matrix: include: - name: "easytier" - opts: "-E 'not test(connector::dns_connector::tests) and not test(tests::three_node)' --test-threads 1 --no-fail-fast" + opts: "-E 'not test(tests::three_node)' --test-threads 1 --no-fail-fast" - - name: "easytier::connector::dns_connector::tests" - opts: "-E 'test(connector::dns_connector::tests)' --test-threads 1 --no-fail-fast" - - - name: "easytier::tests::three_node" + - name: "three_node" opts: "-E 'test(tests::three_node) and not test(subnet_proxy_three_node_test)' --test-threads 1 --no-fail-fast" - - name: "easytier::tests::three_node::subnet_proxy_three_node_test" + - name: "three_node::subnet_proxy_three_node_test" opts: "-E 'test(subnet_proxy_three_node_test)' --test-threads 1 --no-fail-fast" steps: - uses: actions/checkout@v3 diff --git a/Cargo.lock b/Cargo.lock index b27a6825..77e118d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5238,11 +5238,10 @@ dependencies = [ [[package]] name = "num-bigint-dig" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7" dependencies = [ - "byteorder", "lazy_static", "libm", "num-integer", diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 2abfe5f8..fa46d7bb 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -241,6 +241,7 @@ impl EasyTierLauncher { } instance_alive.store(false, std::sync::atomic::Ordering::Relaxed); notifier.notify_one(); + rt.shutdown_background(); })); } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index e704cb17..e1e0c897 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -27,7 +27,9 @@ use crate::{ common::{CompressionAlgoPb, SecureModeConfig}, }, tunnel::{ - common::tests::{_tunnel_bench_netns, wait_for_condition}, + common::tests::{ + _tunnel_bench_netns, _tunnel_pingpong_netns_with_timeout, wait_for_condition, + }, ring::RingTunnelConnector, tcp::{TcpTunnelConnector, TcpTunnelListener}, udp::UdpTunnelConnector, @@ -419,8 +421,10 @@ pub async fn subnet_proxy_loop_prevention_test() { drop_insts(insts).await; } -async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str) { - use crate::tunnel::{common::tests::_tunnel_pingpong_netns, udp::UdpTunnelListener}; +async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str, timeout: Duration) { + use crate::tunnel::{ + common::tests::_tunnel_pingpong_netns_with_timeout, udp::UdpTunnelListener, + }; use rand::Rng; let udp_listener = @@ -438,14 +442,16 @@ async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str) { "net_d" }; - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( udp_listener, udp_connector, NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, + timeout, ) .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); // no fragment let udp_listener = @@ -456,18 +462,22 @@ async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str) { let mut buf = vec![0; 1024]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( udp_listener, udp_connector, NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, + timeout, ) .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); } -async fn subnet_proxy_test_tcp(listen_ip: &str, connect_ip: &str) { - use crate::tunnel::{common::tests::_tunnel_pingpong_netns, tcp::TcpTunnelListener}; +async fn subnet_proxy_test_tcp(listen_ip: &str, connect_ip: &str, timeout: Duration) { + use crate::tunnel::{ + common::tests::_tunnel_pingpong_netns_with_timeout, tcp::TcpTunnelListener, + }; use rand::Rng; let tcp_listener = TcpTunnelListener::new(format!("tcp://{listen_ip}:22223").parse().unwrap()); @@ -483,26 +493,28 @@ async fn subnet_proxy_test_tcp(listen_ip: &str, connect_ip: &str) { "net_d" }; - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( tcp_listener, tcp_connector, NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, + timeout, ) .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); } -async fn subnet_proxy_test_icmp(target_ip: &str) { +async fn subnet_proxy_test_icmp(target_ip: &str, timeout: Duration) { wait_for_condition( || async { ping_test("net_a", target_ip, None).await }, - Duration::from_secs(5), + timeout, ) .await; wait_for_condition( || async { ping_test("net_a", target_ip, Some(5 * 1024)).await }, - Duration::from_secs(5), + timeout, ) .await; } @@ -538,10 +550,10 @@ pub async fn quic_proxy() { let target_ip = "10.1.2.4"; - subnet_proxy_test_icmp(target_ip).await; - subnet_proxy_test_icmp("10.144.144.3").await; - subnet_proxy_test_tcp(target_ip, target_ip).await; - subnet_proxy_test_tcp("0.0.0.0", "10.144.144.3").await; + subnet_proxy_test_icmp(target_ip, Duration::from_secs(5)).await; + subnet_proxy_test_icmp("10.144.144.3", Duration::from_secs(5)).await; + subnet_proxy_test_tcp(target_ip, target_ip, Duration::from_secs(5)).await; + subnet_proxy_test_tcp("0.0.0.0", "10.144.144.3", Duration::from_secs(5)).await; let metrics = insts[0] .get_global_ctx() @@ -629,14 +641,14 @@ pub async fn subnet_proxy_three_node_test( .await; for target_ip in ["10.1.3.4", "10.1.2.4", "10.144.144.3"] { - subnet_proxy_test_icmp(target_ip).await; + subnet_proxy_test_icmp(target_ip, Duration::from_secs(5)).await; let listen_ip = if target_ip == "10.144.144.3" { "0.0.0.0" } else { "10.1.2.4" }; - subnet_proxy_test_tcp(listen_ip, target_ip).await; - subnet_proxy_test_udp(listen_ip, target_ip).await; + subnet_proxy_test_tcp(listen_ip, target_ip, Duration::from_secs(5)).await; + subnet_proxy_test_udp(listen_ip, target_ip, Duration::from_secs(5)).await; } if enable_quic_proxy && !disable_quic_input { let metrics = insts[0] @@ -1373,10 +1385,7 @@ pub async fn port_forward_test( ) .await; - use crate::tunnel::{ - common::tests::_tunnel_pingpong_netns, tcp::TcpTunnelListener, udp::UdpTunnelConnector, - udp::UdpTunnelListener, - }; + use crate::tunnel::{tcp::TcpTunnelListener, udp::UdpTunnelConnector, udp::UdpTunnelListener}; let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:23456".parse().unwrap()); let tcp_connector = TcpTunnelConnector::new("tcp://127.0.0.1:23456".parse().unwrap()); @@ -1384,14 +1393,16 @@ pub async fn port_forward_test( let mut buf = vec![0; buf_size as usize]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( + _tunnel_pingpong_netns_with_timeout( tcp_listener, tcp_connector, NetNS::new(Some("net_c".into())), NetNS::new(Some("net_a".into())), buf, + Duration::from_secs(1), ) - .await; + .await + .unwrap(); let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:23457".parse().unwrap()); let tcp_connector = TcpTunnelConnector::new("tcp://127.0.0.1:23457".parse().unwrap()); @@ -1399,14 +1410,16 @@ pub async fn port_forward_test( let mut buf = vec![0; buf_size as usize]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( + _tunnel_pingpong_netns_with_timeout( tcp_listener, tcp_connector, NetNS::new(Some("net_d".into())), NetNS::new(Some("net_a".into())), buf, + Duration::from_secs(1), ) - .await; + .await + .unwrap(); let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:23458".parse().unwrap()); let udp_connector = UdpTunnelConnector::new("udp://127.0.0.1:23458".parse().unwrap()); @@ -1414,14 +1427,16 @@ pub async fn port_forward_test( let mut buf = vec![0; buf_size as usize]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( + _tunnel_pingpong_netns_with_timeout( udp_listener, udp_connector, NetNS::new(Some("net_c".into())), NetNS::new(Some("net_a".into())), buf, + Duration::from_secs(1), ) - .await; + .await + .unwrap(); let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:23459".parse().unwrap()); let udp_connector = UdpTunnelConnector::new("udp://127.0.0.1:23459".parse().unwrap()); @@ -1429,14 +1444,16 @@ pub async fn port_forward_test( let mut buf = vec![0; buf_size as usize]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( + _tunnel_pingpong_netns_with_timeout( udp_listener, udp_connector, NetNS::new(Some("net_d".into())), NetNS::new(Some("net_a".into())), buf, + Duration::from_secs(1), ) - .await; + .await + .unwrap(); drop_insts(_insts).await; } @@ -1607,7 +1624,7 @@ pub async fn acl_rule_test_inbound( #[values(true, false)] enable_quic_proxy: bool, ) { use crate::tunnel::{ - common::tests::_tunnel_pingpong_netns, + common::tests::_tunnel_pingpong_netns_with_timeout, tcp::{TcpTunnelConnector, TcpTunnelListener}, udp::{UdpTunnelConnector, UdpTunnelListener}, }; @@ -1707,43 +1724,38 @@ pub async fn acl_rule_test_inbound( rand::thread_rng().fill(&mut buf[..]); // 5. 8081 应该可以 pingpong 成功 - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( listener_8081, connector_8081, NetNS::new(Some("net_c".into())), NetNS::new(Some("net_a".into())), buf.clone(), + Duration::from_secs(5), + ) + .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); + + // 6. 8080 应该连接失败(被 ACL 拦截) + let result = _tunnel_pingpong_netns_with_timeout( + listener_8080, + connector_8080, + NetNS::new(Some("net_c".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + Duration::from_millis(500), ) .await; - // 6. 8080 应该连接失败(被 ACL 拦截) - let result = tokio::spawn(tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8080, - connector_8080, - NetNS::new(Some("net_c".into())), - NetNS::new(Some("net_a".into())), - buf.clone(), - ), - )) - .await; - - assert!( - result.is_err() || result.unwrap().is_err(), - "TCP 连接 8080 应被 ACL 拦截,不能成功" - ); + assert!(result.is_err(), "TCP 连接 8080 应被 ACL 拦截,不能成功"); // 7. 从 10.144.144.2 连接 8082 应该连接失败(被 ACL 拦截) - let result = tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8082, - connector_8082, - NetNS::new(Some("net_c".into())), - NetNS::new(Some("net_b".into())), - buf.clone(), - ), + let result = _tunnel_pingpong_netns_with_timeout( + listener_8082, + connector_8082, + NetNS::new(Some("net_c".into())), + NetNS::new(Some("net_b".into())), + buf.clone(), + Duration::from_millis(500), ) .await; @@ -1770,25 +1782,25 @@ pub async fn acl_rule_test_inbound( rand::thread_rng().fill(&mut buf[..]); // 4. 8081 应该可以 pingpong 成功 - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( listener_8081, connector_8081, NetNS::new(Some("net_c".into())), NetNS::new(Some("net_a".into())), buf.clone(), + Duration::from_secs(5), ) .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); // 5. 8080 应该连接失败(被 ACL 拦截) - let result = tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8080, - connector_8080, - NetNS::new(Some("net_c".into())), - NetNS::new(Some("net_a".into())), - buf.clone(), - ), + let result = _tunnel_pingpong_netns_with_timeout( + listener_8080, + connector_8080, + NetNS::new(Some("net_c".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + Duration::from_millis(500), ) .await; @@ -1815,7 +1827,7 @@ pub async fn acl_rule_test_subnet_proxy( #[values(true, false)] enable_quic_proxy: bool, ) { use crate::tunnel::{ - common::tests::_tunnel_pingpong_netns, + common::tests::_tunnel_pingpong_netns_with_timeout, tcp::{TcpTunnelConnector, TcpTunnelListener}, udp::{UdpTunnelConnector, UdpTunnelListener}, }; @@ -1932,48 +1944,46 @@ pub async fn acl_rule_test_subnet_proxy( rand::thread_rng().fill(&mut buf[..]); // 8082 应该可以连接成功(不被 ACL 拦截) - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( listener_8082, connector_8082, NetNS::new(Some("net_d".into())), NetNS::new(Some("net_a".into())), buf.clone(), + Duration::from_secs(5), + ) + .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); + + // 8080 应该连接失败(被 ACL 拦截 - 禁止访问子网代理的 8080) + let result = _tunnel_pingpong_netns_with_timeout( + listener_8080, + connector_8080, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + Duration::from_millis(500), ) .await; - // 8080 应该连接失败(被 ACL 拦截 - 禁止访问子网代理的 8080) - let result = tokio::spawn(tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8080, - connector_8080, - NetNS::new(Some("net_d".into())), - NetNS::new(Some("net_a".into())), - buf.clone(), - ), - )) - .await; - assert!( - result.is_err() || result.unwrap().is_err(), + result.is_err(), "TCP 连接子网代理 8080 应被 ACL 拦截,不能成功" ); // 8081 应该连接失败(被 ACL 拦截 - 禁止 inst1 访问子网代理的 8081) - let result = tokio::spawn(tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8081, - connector_8081, - NetNS::new(Some("net_d".into())), - NetNS::new(Some("net_a".into())), - buf.clone(), - ), - )) + let result = _tunnel_pingpong_netns_with_timeout( + listener_8081, + connector_8081, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + Duration::from_millis(500), + ) .await; assert!( - result.is_err() || result.unwrap().is_err(), + result.is_err(), "TCP 连接子网代理 8081 应被 ACL 拦截,不能成功" ); @@ -1993,25 +2003,25 @@ pub async fn acl_rule_test_subnet_proxy( rand::thread_rng().fill(&mut buf[..]); // 8082 应该可以连接成功 - _tunnel_pingpong_netns( + let result = _tunnel_pingpong_netns_with_timeout( listener_8082, connector_8082, NetNS::new(Some("net_d".into())), NetNS::new(Some("net_a".into())), buf.clone(), + Duration::from_secs(5), ) .await; + assert!(result.is_ok(), "{}", result.unwrap_err()); // 8080 应该连接失败(被 ACL 拦截) - let result = tokio::time::timeout( - std::time::Duration::from_millis(200), - _tunnel_pingpong_netns( - listener_8080, - connector_8080, - NetNS::new(Some("net_d".into())), - NetNS::new(Some("net_a".into())), - buf.clone(), - ), + let result = _tunnel_pingpong_netns_with_timeout( + listener_8080, + connector_8080, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + Duration::from_millis(500), ) .await; @@ -2120,7 +2130,7 @@ pub async fn p2p_only_test( for target_ip in ["10.144.144.3", target_ip] { assert_panics_ext( || async { - subnet_proxy_test_icmp(target_ip).await; + subnet_proxy_test_icmp(target_ip, Duration::from_millis(100)).await; }, !has_p2p_conn, ) @@ -2133,7 +2143,7 @@ pub async fn p2p_only_test( }; assert_panics_ext( || async { - subnet_proxy_test_tcp(listen_ip, target_ip).await; + subnet_proxy_test_tcp(listen_ip, target_ip, Duration::from_millis(100)).await; }, !has_p2p_conn, ) @@ -2141,7 +2151,7 @@ pub async fn p2p_only_test( assert_panics_ext( || async { - subnet_proxy_test_udp(listen_ip, target_ip).await; + subnet_proxy_test_udp(listen_ip, target_ip, Duration::from_millis(100)).await; }, !has_p2p_conn, ) diff --git a/easytier/src/tunnel/common.rs b/easytier/src/tunnel/common.rs index 50b81e6b..b4c05ff5 100644 --- a/easytier/src/tunnel/common.rs +++ b/easytier/src/tunnel/common.rs @@ -495,17 +495,20 @@ pub mod tests { L: TunnelListener + Send + Sync + 'static, C: TunnelConnector + Send + Sync + 'static, { - _tunnel_pingpong_netns( + _tunnel_pingpong_netns_with_timeout( listener, connector, NetNS::new(None), NetNS::new(None), "12345678abcdefg".as_bytes().to_vec(), + // only used by tunnel test, so set a long timeout + tokio::time::Duration::from_secs(5), ) - .await; + .await + .unwrap(); } - pub(crate) async fn _tunnel_pingpong_netns( + async fn _tunnel_pingpong_netns( mut listener: L, mut connector: C, l_netns: NetNS,