diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 7ec14f89..33a72375 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -71,6 +71,7 @@ pub fn gen_default_flags() -> Flags { need_p2p: false, instance_recv_bps_limit: u64::MAX, disable_upnp: false, + disable_relay_data: false, } } diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 69c4f17b..48645deb 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -217,6 +217,12 @@ pub struct GlobalCtx { flags: ArcSwap, + // Runtime/base advertised feature flags before config-owned fields are + // overlaid by set_flags. Keep this separate so config patches do not erase + // runtime state such as public-server role, IPv6 provider status, or the + // non-whitelist avoid-relay preference. + base_feature_flags: AtomicCell, + feature_flags: AtomicCell, token_bucket_manager: TokenBucketManager, @@ -247,8 +253,17 @@ impl std::fmt::Debug for GlobalCtx { pub type ArcGlobalCtx = std::sync::Arc; impl GlobalCtx { - fn derive_feature_flags(flags: &Flags, current: Option) -> PeerFeatureFlag { - let mut feature_flags = current.unwrap_or_default(); + fn apply_disable_relay_data_flag( + flags: &Flags, + mut feature_flags: PeerFeatureFlag, + ) -> PeerFeatureFlag { + if flags.disable_relay_data { + feature_flags.avoid_relay_data = true; + } + feature_flags + } + + fn derive_feature_flags(flags: &Flags, mut feature_flags: PeerFeatureFlag) -> PeerFeatureFlag { feature_flags.kcp_input = !flags.disable_kcp_input; feature_flags.no_relay_kcp = flags.disable_relay_kcp; feature_flags.support_conn_list_sync = true; @@ -256,7 +271,7 @@ impl GlobalCtx { feature_flags.no_relay_quic = flags.disable_relay_quic; feature_flags.need_p2p = flags.need_p2p; feature_flags.disable_p2p = flags.disable_p2p; - feature_flags + Self::apply_disable_relay_data_flag(flags, feature_flags) } pub fn new(config_fs: impl ConfigLoader + 'static) -> Self { @@ -285,7 +300,8 @@ impl GlobalCtx { let flags = config_fs.get_flags(); - let feature_flags = Self::derive_feature_flags(&flags, None); + let base_feature_flags = PeerFeatureFlag::default(); + let feature_flags = Self::derive_feature_flags(&flags, base_feature_flags); let credential_storage_path = config_fs.get_credential_file(); let credential_manager = Arc::new(CredentialManager::new(credential_storage_path)); @@ -318,6 +334,8 @@ impl GlobalCtx { flags: ArcSwap::new(Arc::new(flags)), + base_feature_flags: AtomicCell::new(base_feature_flags), + feature_flags: AtomicCell::new(feature_flags), token_bucket_manager: TokenBucketManager::new(), @@ -513,7 +531,7 @@ impl GlobalCtx { self.config.set_flags(flags.clone()); self.feature_flags.store(Self::derive_feature_flags( &flags, - Some(self.feature_flags.load()), + self.base_feature_flags.load(), )); self.flags.store(Arc::new(flags)); } @@ -578,8 +596,53 @@ impl GlobalCtx { self.feature_flags.load() } - pub fn set_feature_flags(&self, flags: PeerFeatureFlag) { - self.feature_flags.store(flags); + /// Replace the runtime/base advertised flags as a complete snapshot. + /// + /// This is intended for foreign scoped contexts that inherit an already + /// computed feature-flag snapshot from their parent. Most callers should use + /// a narrower setter so they do not accidentally overwrite unrelated runtime + /// state. + pub fn set_base_advertised_feature_flags(&self, feature_flags: PeerFeatureFlag) { + self.base_feature_flags.store(feature_flags); + let flags = self.flags.load(); + self.feature_flags + .store(Self::apply_disable_relay_data_flag( + flags.as_ref(), + feature_flags, + )); + } + + /// Set the avoid-relay preference that is independent of disable_relay_data. + /// + /// disable_relay_data still forces the effective advertised flag to true, + /// but this base preference is preserved when that config flag is toggled. + pub fn set_avoid_relay_data_preference(&self, avoid_relay_data: bool) -> bool { + let mut base_feature_flags = self.base_feature_flags.load(); + base_feature_flags.avoid_relay_data = avoid_relay_data; + self.base_feature_flags.store(base_feature_flags); + + let mut feature_flags = self.feature_flags.load(); + let previous = feature_flags.avoid_relay_data; + feature_flags.avoid_relay_data = avoid_relay_data || self.flags.load().disable_relay_data; + self.feature_flags.store(feature_flags); + previous != feature_flags.avoid_relay_data + } + + /// Set the runtime IPv6-provider advertised bit without touching + /// config-derived feature flags. + pub fn set_ipv6_public_addr_provider_feature_flag(&self, enabled: bool) -> bool { + let mut base_feature_flags = self.base_feature_flags.load(); + base_feature_flags.ipv6_public_addr_provider = enabled; + self.base_feature_flags.store(base_feature_flags); + + let mut feature_flags = self.feature_flags.load(); + if feature_flags.ipv6_public_addr_provider == enabled { + return false; + } + + feature_flags.ipv6_public_addr_provider = enabled; + self.feature_flags.store(feature_flags); + true } pub fn token_bucket_manager(&self) -> &TokenBucketManager { @@ -796,7 +859,7 @@ pub mod tests { let mut feature_flags = global_ctx.get_feature_flags(); feature_flags.avoid_relay_data = true; feature_flags.is_public_server = true; - global_ctx.set_feature_flags(feature_flags); + global_ctx.set_base_advertised_feature_flags(feature_flags); let mut flags = global_ctx.get_flags().clone(); flags.disable_kcp_input = true; @@ -820,6 +883,83 @@ pub mod tests { assert!(!feature_flags.ipv6_public_addr_provider); } + #[tokio::test] + async fn set_base_advertised_feature_flags_applies_current_values() { + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); + + let feature_flags = PeerFeatureFlag { + kcp_input: false, + no_relay_kcp: true, + quic_input: false, + no_relay_quic: true, + is_public_server: true, + ..Default::default() + }; + global_ctx.set_base_advertised_feature_flags(feature_flags); + + assert_eq!(global_ctx.get_feature_flags(), feature_flags); + } + + #[tokio::test] + async fn set_base_advertised_feature_flags_keeps_disable_relay_data_effective() { + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = true; + global_ctx.set_flags(flags); + + let mut feature_flags = global_ctx.get_feature_flags(); + feature_flags.avoid_relay_data = false; + feature_flags.is_public_server = true; + global_ctx.set_base_advertised_feature_flags(feature_flags); + + let advertised_feature_flags = global_ctx.get_feature_flags(); + assert!(advertised_feature_flags.avoid_relay_data); + assert!(advertised_feature_flags.is_public_server); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + + let advertised_feature_flags = global_ctx.get_feature_flags(); + assert!(!advertised_feature_flags.avoid_relay_data); + assert!(advertised_feature_flags.is_public_server); + } + + #[tokio::test] + async fn disable_relay_data_sets_avoid_relay_feature_flag() { + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = true; + global_ctx.set_flags(flags); + + assert!(global_ctx.get_feature_flags().avoid_relay_data); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + + assert!(!global_ctx.get_feature_flags().avoid_relay_data); + + global_ctx.set_avoid_relay_data_preference(true); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = true; + global_ctx.set_flags(flags); + + assert!(global_ctx.get_feature_flags().avoid_relay_data); + + let mut flags = global_ctx.get_flags().clone(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + + assert!(global_ctx.get_feature_flags().avoid_relay_data); + } + #[tokio::test] async fn should_deny_proxy_for_process_wide_rpc_port() { protected_port::clear_protected_tcp_ports_for_test(); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index d827c14e..0f009088 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -340,6 +340,11 @@ impl InstanceConfigPatcher { global_ctx.set_ipv6(Some(ipv6.into())); global_ctx.config.set_ipv6(Some(ipv6.into())); } + if let Some(disable_relay_data) = patch.disable_relay_data { + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = disable_relay_data; + global_ctx.set_flags(flags); + } if let Some(enabled) = patch.ipv6_public_addr_provider { global_ctx.config.set_ipv6_public_addr_provider(enabled); provider_config_changed = true; diff --git a/easytier/src/instance/public_ipv6_provider.rs b/easytier/src/instance/public_ipv6_provider.rs index 369bbf55..999a9bb3 100644 --- a/easytier/src/instance/public_ipv6_provider.rs +++ b/easytier/src/instance/public_ipv6_provider.rs @@ -361,16 +361,8 @@ fn apply_public_ipv6_provider_runtime_state( let prefix_changed = global_ctx.set_advertised_ipv6_public_addr_prefix(next_prefix); let next_provider_enabled = matches!(state, PublicIpv6ProviderRuntimeState::Active(_)); - let feature_changed = { - let mut feature_flags = global_ctx.get_feature_flags(); - if feature_flags.ipv6_public_addr_provider == next_provider_enabled { - false - } else { - feature_flags.ipv6_public_addr_provider = next_provider_enabled; - global_ctx.set_feature_flags(feature_flags); - true - } - }; + let feature_changed = + global_ctx.set_ipv6_public_addr_provider_feature_flag(next_provider_enabled); prefix_changed || feature_changed } diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 82d5b02b..eff846ee 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -816,6 +816,10 @@ impl NetworkConfig { flags.disable_upnp = disable_upnp; } + if let Some(disable_relay_data) = self.disable_relay_data { + flags.disable_relay_data = disable_relay_data; + } + if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching { flags.disable_sym_hole_punching = disable_sym_hole_punching; } @@ -990,6 +994,7 @@ impl NetworkConfig { result.disable_tcp_hole_punching = Some(flags.disable_tcp_hole_punching); result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching); result.disable_upnp = Some(flags.disable_upnp); + result.disable_relay_data = Some(flags.disable_relay_data); result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching); result.enable_magic_dns = Some(flags.accept_dns); result.mtu = Some(flags.mtu as i32); diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 1fb6c22c..5f8943e0 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -56,7 +56,7 @@ use super::{ route_trait::NextHopPolicy, traffic_metrics::{ InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder, - route_peer_info_instance_id, traffic_kind, + is_relay_data_packet_type, route_peer_info_instance_id, traffic_kind, }, }; @@ -69,11 +69,16 @@ pub trait GlobalForeignNetworkAccessor: Send + Sync + 'static { struct ForeignNetworkEntry { my_peer_id: PeerId, + // Node-global runtime flags, such as disable_relay_data, live on the parent + // context. The foreign context is scoped to the foreign network's OSPF view. + parent_global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx, network: NetworkIdentity, peer_map: Arc, relay_peer_map: Arc, peer_session_store: Arc, + // Static per-network permission from the whitelist check. disable_relay_data + // is the node-wide runtime override layered on top of this value. relay_data: bool, pm_packet_sender: Mutex>, @@ -205,6 +210,7 @@ impl ForeignNetworkEntry { Self { my_peer_id, + parent_global_ctx: global_ctx.clone(), global_ctx: foreign_global_ctx, network, peer_map, @@ -231,6 +237,27 @@ impl ForeignNetworkEntry { } } + fn desired_avoid_relay_data_feature_flag( + parent_global_ctx: &ArcGlobalCtx, + relay_data: bool, + ) -> bool { + !relay_data || parent_global_ctx.get_feature_flags().avoid_relay_data + } + + fn sync_parent_relay_data_feature_flag( + parent_global_ctx: &ArcGlobalCtx, + global_ctx: &ArcGlobalCtx, + relay_data: bool, + ) -> bool { + let avoid_relay_data = + Self::desired_avoid_relay_data_feature_flag(parent_global_ctx, relay_data); + if global_ctx.get_feature_flags().avoid_relay_data == avoid_relay_data { + return false; + } + + global_ctx.set_avoid_relay_data_preference(avoid_relay_data) + } + fn build_foreign_global_ctx( network: &NetworkIdentity, global_ctx: ArcGlobalCtx, @@ -258,10 +285,9 @@ impl ForeignNetworkEntry { let mut feature_flag = global_ctx.get_feature_flags(); feature_flag.is_public_server = true; - if !relay_data { - feature_flag.avoid_relay_data = true; - } - foreign_global_ctx.set_feature_flags(feature_flag); + feature_flag.avoid_relay_data = + Self::desired_avoid_relay_data_feature_flag(&global_ctx, relay_data); + foreign_global_ctx.set_base_advertised_feature_flags(feature_flag); for u in global_ctx.get_running_listeners().into_iter() { foreign_global_ctx.add_running_listener(u); @@ -412,6 +438,7 @@ impl ForeignNetworkEntry { let peer_map = self.peer_map.clone(); let relay_peer_map = self.relay_peer_map.clone(); let traffic_metrics = self.traffic_metrics.clone(); + let parent_global_ctx = self.parent_global_ctx.clone(); let relay_data = self.relay_data; let pm_sender = self.pm_packet_sender.lock().await.take().unwrap(); let network_name = self.network.network_name.clone(); @@ -497,11 +524,16 @@ impl ForeignNetworkEntry { "ignore packet in foreign network" ); } else { - if packet_type == PacketType::Data as u8 - || packet_type == PacketType::KcpSrc as u8 - || packet_type == PacketType::KcpDst as u8 - { - if !relay_data { + if is_relay_data_packet_type(packet_type) { + let disable_relay_data = parent_global_ctx.flags_arc().disable_relay_data; + if !relay_data || disable_relay_data { + tracing::debug!( + ?from_peer_id, + ?to_peer_id, + packet_type, + disable_relay_data, + "drop foreign network relay data" + ); continue; } if !bps_limiter.try_consume(len.into()) { @@ -589,10 +621,31 @@ impl ForeignNetworkEntry { }); } + async fn run_parent_feature_flag_sync_routine(&self) { + let parent_global_ctx = self.parent_global_ctx.clone(); + let global_ctx = self.global_ctx.clone(); + let relay_data = self.relay_data; + self.tasks.lock().await.spawn(async move { + let mut parent_events = parent_global_ctx.subscribe(); + loop { + ForeignNetworkEntry::sync_parent_relay_data_feature_flag( + &parent_global_ctx, + &global_ctx, + relay_data, + ); + + if parent_events.recv().await.is_err() { + parent_events = parent_global_ctx.subscribe(); + } + } + }); + } + async fn prepare(&self, accessor: Box) { self.prepare_route(accessor).await; self.start_packet_recv().await; self.run_relay_session_gc_routine().await; + self.run_parent_feature_flag_sync_routine().await; self.peer_rpc.run(); self.peer_center.init().await; } @@ -1397,6 +1450,92 @@ pub mod tests { ); } + #[tokio::test] + async fn disable_relay_data_blocks_foreign_network_transit_data() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + + connect_peer_manager(pma_net1.clone(), pm_center.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await; + wait_route_appear(pma_net1.clone(), pmb_net1.clone()) + .await + .unwrap(); + + let mut flags = pm_center.get_global_ctx().get_flags(); + flags.disable_relay_data = true; + pm_center.get_global_ctx().set_flags(flags); + pm_center + .get_global_ctx() + .issue_event(GlobalCtxEvent::ConfigPatched(Default::default())); + + let center_peer_id = pm_center + .get_foreign_network_manager() + .get_network_peer_id("net1") + .unwrap(); + wait_for_condition( + || { + let pma_net1 = pma_net1.clone(); + async move { + pma_net1.list_routes().await.iter().any(|route| { + route.peer_id == center_peer_id + && route + .feature_flag + .as_ref() + .map(|flag| flag.avoid_relay_data) + .unwrap_or(false) + }) + } + }, + Duration::from_secs(5), + ) + .await; + + let network_labels = + LabelSet::new().with_label_type(LabelType::NetworkName("net1".to_string())); + let forwarded_bytes_before = metric_value( + &pm_center, + MetricName::TrafficBytesForwarded, + network_labels.clone(), + ); + let forwarded_packets_before = metric_value( + &pm_center, + MetricName::TrafficPacketsForwarded, + network_labels.clone(), + ); + + let mut transit_pkt = ZCPacket::new_with_payload(b"foreign-transit-disabled"); + transit_pkt.fill_peer_manager_hdr( + pma_net1.my_peer_id(), + pmb_net1.my_peer_id(), + PacketType::Data as u8, + ); + pma_net1 + .get_foreign_network_client() + .send_msg(transit_pkt, center_peer_id) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(300)).await; + + assert_eq!( + metric_value( + &pm_center, + MetricName::TrafficBytesForwarded, + network_labels.clone() + ), + forwarded_bytes_before + ); + assert_eq!( + metric_value( + &pm_center, + MetricName::TrafficPacketsForwarded, + network_labels + ), + forwarded_packets_before + ); + } + #[tokio::test] async fn foreign_network_transit_control_forwarding_records_control_forwarded_metrics() { let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; @@ -1409,6 +1548,10 @@ pub mod tests { .await .unwrap(); + let mut flags = pm_center.get_global_ctx().get_flags(); + flags.disable_relay_data = true; + pm_center.get_global_ctx().set_flags(flags); + let center_peer_id = pm_center .get_foreign_network_manager() .get_network_peer_id("net1") @@ -1657,6 +1800,81 @@ pub mod tests { )); } + #[tokio::test] + async fn foreign_entry_feature_flag_tracks_parent_disable_relay_data_toggle() { + let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new( + "__access__".to_string(), + "access_secret".to_string(), + ))); + let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string()); + let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan(); + let entry = ForeignNetworkEntry::new( + foreign_network, + 1, + global_ctx.clone(), + true, + Arc::new(PeerSessionStore::new()), + pm_packet_sender, + ); + assert!(!entry.global_ctx.get_feature_flags().avoid_relay_data); + + entry.run_parent_feature_flag_sync_routine().await; + + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = true; + global_ctx.set_flags(flags); + global_ctx.issue_event(GlobalCtxEvent::ConfigPatched(Default::default())); + + wait_for_condition( + || async { entry.global_ctx.get_feature_flags().avoid_relay_data }, + Duration::from_secs(2), + ) + .await; + + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + global_ctx.issue_event(GlobalCtxEvent::ConfigPatched(Default::default())); + + wait_for_condition( + || async { !entry.global_ctx.get_feature_flags().avoid_relay_data }, + Duration::from_secs(2), + ) + .await; + } + + #[tokio::test] + async fn foreign_entry_without_relay_data_keeps_avoid_feature_flag() { + let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new( + "__access__".to_string(), + "access_secret".to_string(), + ))); + let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string()); + let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan(); + let entry = ForeignNetworkEntry::new( + foreign_network, + 1, + global_ctx.clone(), + false, + Arc::new(PeerSessionStore::new()), + pm_packet_sender, + ); + + assert!(entry.global_ctx.get_feature_flags().avoid_relay_data); + + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + + ForeignNetworkEntry::sync_parent_relay_data_feature_flag( + &global_ctx, + &entry.global_ctx, + entry.relay_data, + ); + + assert!(entry.global_ctx.get_feature_flags().avoid_relay_data); + } + #[test] fn credential_trust_path_rejects_admin_identity() { assert!(ForeignNetworkManager::should_reject_credential_trust_path( diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index e48cb700..8688438b 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -38,7 +38,7 @@ use crate::{ route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface}, traffic_metrics::{ InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder, - route_peer_info_instance_id, traffic_kind, + is_relay_data_packet_type, route_peer_info_instance_id, traffic_kind, }, }, proto::{ @@ -263,9 +263,7 @@ impl PeerManager { .is_err() { // if local network is not in whitelist, avoid relay data when exist any other route path - let mut f = global_ctx.get_feature_flags(); - f.avoid_relay_data = true; - global_ctx.set_feature_flags(f); + global_ctx.set_avoid_relay_data_preference(true); } let is_secure_mode_enabled = global_ctx @@ -774,6 +772,7 @@ impl PeerManager { my_peer_id: PeerId, peer_map: &PeerMap, foreign_network_mgr: &ForeignNetworkManager, + disable_relay_data: bool, ) -> Result<(), ZCPacket> { let pm_header = packet.peer_manager_header().unwrap(); if pm_header.packet_type != PacketType::ForeignNetworkPacket as u8 { @@ -783,6 +782,16 @@ impl PeerManager { let from_peer_id = pm_header.from_peer_id.get(); let to_peer_id = pm_header.to_peer_id.get(); + if disable_relay_data && Self::is_relay_data_zc_packet(&packet) { + tracing::debug!( + ?from_peer_id, + ?to_peer_id, + inner_packet_type = ?packet.foreign_network_inner_packet_type(), + "drop foreign network relay data while relay data is disabled" + ); + return Ok(()); + } + let foreign_hdr = packet.foreign_network_hdr().unwrap(); let foreign_network_name = foreign_hdr.get_network_name(packet.payload()); let foreign_peer_id = foreign_hdr.get_dst_peer_id(); @@ -872,6 +881,29 @@ impl PeerManager { } } + fn is_relay_data_packet(packet_type: u8) -> bool { + is_relay_data_packet_type(packet_type) + } + + fn is_relay_data_zc_packet(packet: &ZCPacket) -> bool { + let Some(hdr) = packet.peer_manager_header() else { + return false; + }; + + if hdr.packet_type == PacketType::ForeignNetworkPacket as u8 { + let inner_packet_type = packet.foreign_network_inner_packet_type(); + if inner_packet_type.is_none() { + tracing::warn!( + ?hdr, + "foreign network packet has unparseable inner peer manager header" + ); + } + return inner_packet_type.is_none_or(Self::is_relay_data_packet); + } + + Self::is_relay_data_packet(hdr.packet_type) + } + async fn start_peer_recv(&self) { let mut recv = self.packet_recv.lock().await.take().unwrap(); let my_peer_id = self.my_peer_id; @@ -925,14 +957,21 @@ impl PeerManager { self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); while let Ok(ret) = recv_packet_from_chan(&mut recv).await { - let Err(mut ret) = - Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr) - .await + let disable_relay_data = global_ctx.flags_arc().disable_relay_data; + let Err(mut ret) = Self::try_handle_foreign_network_packet( + ret, + my_peer_id, + &peers, + &foreign_mgr, + disable_relay_data, + ) + .await else { continue; }; let buf_len = ret.buf_len(); + let is_relay_data_packet = Self::is_relay_data_zc_packet(&ret); let Some(hdr) = ret.mut_peer_manager_header() else { tracing::warn!(?ret, "invalid packet, skip"); continue; @@ -944,6 +983,16 @@ impl PeerManager { let packet_type = hdr.packet_type; let is_encrypted = hdr.is_encrypted(); if to_peer_id != my_peer_id { + if disable_relay_data && is_relay_data_packet { + tracing::debug!( + ?from_peer_id, + ?to_peer_id, + packet_type, + "drop forwarded relay data while relay data is disabled" + ); + continue; + } + if hdr.forward_counter > 7 { tracing::warn!(?hdr, "forward counter exceed, drop packet"); continue; @@ -2080,7 +2129,7 @@ mod tests { }, }, proto::{ - common::{CompressionAlgoPb, NatType, PeerFeatureFlag}, + common::{CompressionAlgoPb, NatType}, peer_rpc::SecureAuthLevel, }, tunnel::{ @@ -2224,6 +2273,84 @@ mod tests { assert_eq!(signal.version(), initial_version + 2); } + #[test] + fn disable_relay_data_classifies_data_plane_packets_only() { + for packet_type in [ + PacketType::Data, + PacketType::KcpSrc, + PacketType::KcpDst, + PacketType::QuicSrc, + PacketType::QuicDst, + PacketType::DataWithKcpSrcModified, + PacketType::DataWithQuicSrcModified, + PacketType::RelayHandshake, + PacketType::RelayHandshakeAck, + PacketType::ForeignNetworkPacket, + ] { + assert!(PeerManager::is_relay_data_packet(packet_type as u8)); + } + + for packet_type in [ + PacketType::RpcReq, + PacketType::RpcResp, + PacketType::Ping, + PacketType::Pong, + PacketType::HandShake, + PacketType::NoiseHandshakeMsg1, + PacketType::NoiseHandshakeMsg2, + PacketType::NoiseHandshakeMsg3, + ] { + assert!(!PeerManager::is_relay_data_packet(packet_type as u8)); + } + } + + #[test] + fn disable_relay_data_inspects_foreign_network_inner_packet_type() { + let network_name = "net1".to_string(); + + let mut rpc_packet = ZCPacket::new_with_payload(b"rpc"); + rpc_packet.fill_peer_manager_hdr(1, 2, PacketType::RpcReq as u8); + let mut foreign_rpc_packet = + ZCPacket::new_for_foreign_network(&network_name, 2, &rpc_packet); + foreign_rpc_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8); + + assert_eq!( + foreign_rpc_packet.foreign_network_inner_packet_type(), + Some(PacketType::RpcReq as u8) + ); + assert!(!PeerManager::is_relay_data_zc_packet(&foreign_rpc_packet)); + + let mut data_packet = ZCPacket::new_with_payload(b"data"); + data_packet.fill_peer_manager_hdr(1, 2, PacketType::Data as u8); + let mut foreign_data_packet = + ZCPacket::new_for_foreign_network(&network_name, 2, &data_packet); + foreign_data_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8); + + assert_eq!( + foreign_data_packet.foreign_network_inner_packet_type(), + Some(PacketType::Data as u8) + ); + assert!(PeerManager::is_relay_data_zc_packet(&foreign_data_packet)); + } + + #[tokio::test] + async fn non_whitelisted_network_avoid_relay_survives_disable_relay_data_toggle() { + let global_ctx = get_mock_global_ctx(); + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = true; + flags.relay_network_whitelist = "other-network".to_string(); + global_ctx.set_flags(flags); + + let (packet_send, _packet_recv) = create_packet_recv_chan(); + let _peer_mgr = PeerManager::new(RouteAlgoType::Ospf, global_ctx.clone(), packet_send); + + let mut flags = global_ctx.get_flags(); + flags.disable_relay_data = false; + global_ctx.set_flags(flags); + + assert!(global_ctx.get_feature_flags().avoid_relay_data); + } + #[tokio::test] async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() { let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; @@ -3121,10 +3248,7 @@ mod tests { // when b's avoid_relay_data is true, a->c should route through d and e, cost is 3 peer_mgr_b .get_global_ctx() - .set_feature_flags(PeerFeatureFlag { - avoid_relay_data: true, - ..Default::default() - }); + .set_avoid_relay_data_preference(true); tokio::time::sleep(Duration::from_secs(2)).await; if wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(3)) .await diff --git a/easytier/src/peers/traffic_metrics.rs b/easytier/src/peers/traffic_metrics.rs index ecd5c5d4..4822a383 100644 --- a/easytier/src/peers/traffic_metrics.rs +++ b/easytier/src/peers/traffic_metrics.rs @@ -241,6 +241,13 @@ pub(crate) fn traffic_kind(packet_type: u8) -> TrafficKind { } } +pub(crate) fn is_relay_data_packet_type(packet_type: u8) -> bool { + traffic_kind(packet_type) == TrafficKind::Data + || packet_type == PacketType::RelayHandshake as u8 + || packet_type == PacketType::RelayHandshakeAck as u8 + || packet_type == PacketType::ForeignNetworkPacket as u8 +} + #[derive(Clone)] struct TrafficMetricGroup { data: Arc, diff --git a/easytier/src/proto/api_config.proto b/easytier/src/proto/api_config.proto index 131b8eb3..ccfa30cc 100644 --- a/easytier/src/proto/api_config.proto +++ b/easytier/src/proto/api_config.proto @@ -27,6 +27,7 @@ message InstanceConfigPatch { optional bool ipv6_public_addr_provider = 11; optional bool ipv6_public_addr_auto = 12; optional string ipv6_public_addr_prefix = 13; + optional bool disable_relay_data = 14; } message PortForwardPatch { diff --git a/easytier/src/proto/api_manage.proto b/easytier/src/proto/api_manage.proto index f09f52a8..0b1a472a 100644 --- a/easytier/src/proto/api_manage.proto +++ b/easytier/src/proto/api_manage.proto @@ -99,6 +99,7 @@ message NetworkConfig { optional bool ipv6_public_addr_provider = 62; optional bool ipv6_public_addr_auto = 63; optional string ipv6_public_addr_prefix = 64; + optional bool disable_relay_data = 65; } message PortForwardConfig { diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index ae5098da..15ed939a 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -75,6 +75,7 @@ message FlagsInConfig { bool need_p2p = 38; uint64 instance_recv_bps_limit = 39; bool disable_upnp = 40; + bool disable_relay_data = 41; } message RpcDescriptor { diff --git a/easytier/src/tests/credential_tests.rs b/easytier/src/tests/credential_tests.rs index 235b0732..17a77b54 100644 --- a/easytier/src/tests/credential_tests.rs +++ b/easytier/src/tests/credential_tests.rs @@ -501,10 +501,10 @@ async fn credential_relay_capability(#[case] allow_relay: bool) { // Create admin node let admin_config = create_admin_config("admin", Some("ns_adm"), "10.144.144.1", "fd00::1/64"); let mut admin_inst = Instance::new(admin_config); - let mut ff = admin_inst.get_global_ctx().get_feature_flags(); // if cred c allow relay, we set admin inst avoid relay (if other same-cost path available, admin will not relay data) - ff.avoid_relay_data = allow_relay; - admin_inst.get_global_ctx().set_feature_flags(ff); + admin_inst + .get_global_ctx() + .set_avoid_relay_data_preference(allow_relay); admin_inst.run().await.unwrap(); let admin_peer_id = admin_inst.peer_id(); diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index ab2f4be3..c5ac99b9 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -3730,6 +3730,153 @@ pub async fn config_patch_test() { drop_insts(insts).await; } +#[rstest::rstest] +#[tokio::test] +#[serial_test::serial] +pub async fn config_patch_disable_relay_data_test() { + use crate::proto::api::config::InstanceConfigPatch; + + let insts = init_three_node_ex( + "udp", + |cfg| { + cfg.set_ipv6(None); + cfg + }, + false, + ) + .await; + + let relay_peer_id = insts[1].peer_id(); + let dst_peer_id = insts[2].peer_id(); + assert!(!insts[1].get_global_ctx().get_flags().disable_relay_data); + assert!( + !insts[1] + .get_global_ctx() + .get_feature_flags() + .avoid_relay_data + ); + + check_route_ex( + insts[0].get_peer_manager().list_routes().await, + dst_peer_id, + |route| { + assert_eq!(route.next_hop_peer_id, relay_peer_id); + true + }, + ); + + wait_for_condition( + || async { ping_test("net_a", "10.144.144.3", None).await }, + Duration::from_secs(5), + ) + .await; + + insts[1] + .get_config_patcher() + .apply_patch(InstanceConfigPatch { + disable_relay_data: Some(true), + ..Default::default() + }) + .await + .unwrap(); + + assert!(insts[1].get_global_ctx().get_flags().disable_relay_data); + assert!( + insts[1] + .get_global_ctx() + .config + .get_flags() + .disable_relay_data + ); + assert!( + insts[1] + .get_global_ctx() + .get_feature_flags() + .avoid_relay_data + ); + + wait_for_condition( + || { + let peer_mgr = insts[0].get_peer_manager().clone(); + async move { + peer_mgr.list_routes().await.iter().any(|route| { + route.peer_id == relay_peer_id + && route + .feature_flag + .as_ref() + .map(|flag| flag.avoid_relay_data) + .unwrap_or(false) + }) + } + }, + Duration::from_secs(5), + ) + .await; + + check_route_ex( + insts[0].get_peer_manager().list_routes().await, + dst_peer_id, + |route| { + assert_eq!(route.next_hop_peer_id, relay_peer_id); + true + }, + ); + assert!( + !ping_test("net_a", "10.144.144.3", None).await, + "traffic from inst1 to inst3 should be blocked while inst2 relay data is disabled" + ); + + insts[1] + .get_config_patcher() + .apply_patch(InstanceConfigPatch { + disable_relay_data: Some(false), + ..Default::default() + }) + .await + .unwrap(); + + assert!(!insts[1].get_global_ctx().get_flags().disable_relay_data); + assert!( + !insts[1] + .get_global_ctx() + .config + .get_flags() + .disable_relay_data + ); + assert!( + !insts[1] + .get_global_ctx() + .get_feature_flags() + .avoid_relay_data + ); + + wait_for_condition( + || { + let peer_mgr = insts[0].get_peer_manager().clone(); + async move { + peer_mgr.list_routes().await.iter().any(|route| { + route.peer_id == relay_peer_id + && route + .feature_flag + .as_ref() + .map(|flag| !flag.avoid_relay_data) + .unwrap_or(false) + }) + } + }, + Duration::from_secs(5), + ) + .await; + + wait_for_condition( + || async { ping_test("net_a", "10.144.144.3", None).await }, + Duration::from_secs(5), + ) + .await; + + drop_insts(insts).await; +} + /// Generate SecureModeConfig with specified x25519 private key pub fn generate_secure_mode_config_with_key( private_key: &x25519_dalek::StaticSecret, diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index 7d78bbf5..10cf5f4d 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -730,6 +730,17 @@ impl ZCPacket { } } + pub fn foreign_network_inner_packet_type(&self) -> Option { + if self.peer_manager_header()?.packet_type != PacketType::ForeignNetworkPacket as u8 { + return None; + } + + let payload = self.payload(); + let hdr = ForeignNetworkPacketHeader::ref_from_prefix(payload)?; + let inner_packet = payload.get(hdr.get_header_len()..)?; + PeerManagerHeader::ref_from_prefix(inner_packet).map(|hdr| hdr.packet_type) + } + pub fn foreign_network_packet(mut self) -> Self { let hdr = self.foreign_network_hdr().unwrap(); let foreign_hdr_len = hdr.get_header_len();