feat: support disabling relay data forwarding (#2188)

- add a disable_relay_data runtime/config patch option
- reuse the existing avoid_relay_data feature flag when relay data forwarding is disabled
This commit is contained in:
KKRainbow
2026-04-30 19:44:40 +08:00
committed by GitHub
parent ed8df2d58f
commit 97c8c4f55a
14 changed files with 696 additions and 43 deletions
+1
View File
@@ -71,6 +71,7 @@ pub fn gen_default_flags() -> Flags {
need_p2p: false, need_p2p: false,
instance_recv_bps_limit: u64::MAX, instance_recv_bps_limit: u64::MAX,
disable_upnp: false, disable_upnp: false,
disable_relay_data: false,
} }
} }
+148 -8
View File
@@ -217,6 +217,12 @@ pub struct GlobalCtx {
flags: ArcSwap<Flags>, flags: ArcSwap<Flags>,
// Runtime/base advertised feature flags before config-owned fields are
// overlaid by set_flags. Keep this separate so config patches do not erase
// runtime state such as public-server role, IPv6 provider status, or the
// non-whitelist avoid-relay preference.
base_feature_flags: AtomicCell<PeerFeatureFlag>,
feature_flags: AtomicCell<PeerFeatureFlag>, feature_flags: AtomicCell<PeerFeatureFlag>,
token_bucket_manager: TokenBucketManager, token_bucket_manager: TokenBucketManager,
@@ -247,8 +253,17 @@ impl std::fmt::Debug for GlobalCtx {
pub type ArcGlobalCtx = std::sync::Arc<GlobalCtx>; pub type ArcGlobalCtx = std::sync::Arc<GlobalCtx>;
impl GlobalCtx { impl GlobalCtx {
fn derive_feature_flags(flags: &Flags, current: Option<PeerFeatureFlag>) -> PeerFeatureFlag { fn apply_disable_relay_data_flag(
let mut feature_flags = current.unwrap_or_default(); flags: &Flags,
mut feature_flags: PeerFeatureFlag,
) -> PeerFeatureFlag {
if flags.disable_relay_data {
feature_flags.avoid_relay_data = true;
}
feature_flags
}
fn derive_feature_flags(flags: &Flags, mut feature_flags: PeerFeatureFlag) -> PeerFeatureFlag {
feature_flags.kcp_input = !flags.disable_kcp_input; feature_flags.kcp_input = !flags.disable_kcp_input;
feature_flags.no_relay_kcp = flags.disable_relay_kcp; feature_flags.no_relay_kcp = flags.disable_relay_kcp;
feature_flags.support_conn_list_sync = true; feature_flags.support_conn_list_sync = true;
@@ -256,7 +271,7 @@ impl GlobalCtx {
feature_flags.no_relay_quic = flags.disable_relay_quic; feature_flags.no_relay_quic = flags.disable_relay_quic;
feature_flags.need_p2p = flags.need_p2p; feature_flags.need_p2p = flags.need_p2p;
feature_flags.disable_p2p = flags.disable_p2p; feature_flags.disable_p2p = flags.disable_p2p;
feature_flags Self::apply_disable_relay_data_flag(flags, feature_flags)
} }
pub fn new(config_fs: impl ConfigLoader + 'static) -> Self { pub fn new(config_fs: impl ConfigLoader + 'static) -> Self {
@@ -285,7 +300,8 @@ impl GlobalCtx {
let flags = config_fs.get_flags(); let flags = config_fs.get_flags();
let feature_flags = Self::derive_feature_flags(&flags, None); let base_feature_flags = PeerFeatureFlag::default();
let feature_flags = Self::derive_feature_flags(&flags, base_feature_flags);
let credential_storage_path = config_fs.get_credential_file(); let credential_storage_path = config_fs.get_credential_file();
let credential_manager = Arc::new(CredentialManager::new(credential_storage_path)); let credential_manager = Arc::new(CredentialManager::new(credential_storage_path));
@@ -318,6 +334,8 @@ impl GlobalCtx {
flags: ArcSwap::new(Arc::new(flags)), flags: ArcSwap::new(Arc::new(flags)),
base_feature_flags: AtomicCell::new(base_feature_flags),
feature_flags: AtomicCell::new(feature_flags), feature_flags: AtomicCell::new(feature_flags),
token_bucket_manager: TokenBucketManager::new(), token_bucket_manager: TokenBucketManager::new(),
@@ -513,7 +531,7 @@ impl GlobalCtx {
self.config.set_flags(flags.clone()); self.config.set_flags(flags.clone());
self.feature_flags.store(Self::derive_feature_flags( self.feature_flags.store(Self::derive_feature_flags(
&flags, &flags,
Some(self.feature_flags.load()), self.base_feature_flags.load(),
)); ));
self.flags.store(Arc::new(flags)); self.flags.store(Arc::new(flags));
} }
@@ -578,8 +596,53 @@ impl GlobalCtx {
self.feature_flags.load() self.feature_flags.load()
} }
pub fn set_feature_flags(&self, flags: PeerFeatureFlag) { /// Replace the runtime/base advertised flags as a complete snapshot.
self.feature_flags.store(flags); ///
/// This is intended for foreign scoped contexts that inherit an already
/// computed feature-flag snapshot from their parent. Most callers should use
/// a narrower setter so they do not accidentally overwrite unrelated runtime
/// state.
pub fn set_base_advertised_feature_flags(&self, feature_flags: PeerFeatureFlag) {
self.base_feature_flags.store(feature_flags);
let flags = self.flags.load();
self.feature_flags
.store(Self::apply_disable_relay_data_flag(
flags.as_ref(),
feature_flags,
));
}
/// Set the avoid-relay preference that is independent of disable_relay_data.
///
/// disable_relay_data still forces the effective advertised flag to true,
/// but this base preference is preserved when that config flag is toggled.
pub fn set_avoid_relay_data_preference(&self, avoid_relay_data: bool) -> bool {
let mut base_feature_flags = self.base_feature_flags.load();
base_feature_flags.avoid_relay_data = avoid_relay_data;
self.base_feature_flags.store(base_feature_flags);
let mut feature_flags = self.feature_flags.load();
let previous = feature_flags.avoid_relay_data;
feature_flags.avoid_relay_data = avoid_relay_data || self.flags.load().disable_relay_data;
self.feature_flags.store(feature_flags);
previous != feature_flags.avoid_relay_data
}
/// Set the runtime IPv6-provider advertised bit without touching
/// config-derived feature flags.
pub fn set_ipv6_public_addr_provider_feature_flag(&self, enabled: bool) -> bool {
let mut base_feature_flags = self.base_feature_flags.load();
base_feature_flags.ipv6_public_addr_provider = enabled;
self.base_feature_flags.store(base_feature_flags);
let mut feature_flags = self.feature_flags.load();
if feature_flags.ipv6_public_addr_provider == enabled {
return false;
}
feature_flags.ipv6_public_addr_provider = enabled;
self.feature_flags.store(feature_flags);
true
} }
pub fn token_bucket_manager(&self) -> &TokenBucketManager { pub fn token_bucket_manager(&self) -> &TokenBucketManager {
@@ -796,7 +859,7 @@ pub mod tests {
let mut feature_flags = global_ctx.get_feature_flags(); let mut feature_flags = global_ctx.get_feature_flags();
feature_flags.avoid_relay_data = true; feature_flags.avoid_relay_data = true;
feature_flags.is_public_server = true; feature_flags.is_public_server = true;
global_ctx.set_feature_flags(feature_flags); global_ctx.set_base_advertised_feature_flags(feature_flags);
let mut flags = global_ctx.get_flags().clone(); let mut flags = global_ctx.get_flags().clone();
flags.disable_kcp_input = true; flags.disable_kcp_input = true;
@@ -820,6 +883,83 @@ pub mod tests {
assert!(!feature_flags.ipv6_public_addr_provider); assert!(!feature_flags.ipv6_public_addr_provider);
} }
#[tokio::test]
async fn set_base_advertised_feature_flags_applies_current_values() {
let config = TomlConfigLoader::default();
let global_ctx = GlobalCtx::new(config);
let feature_flags = PeerFeatureFlag {
kcp_input: false,
no_relay_kcp: true,
quic_input: false,
no_relay_quic: true,
is_public_server: true,
..Default::default()
};
global_ctx.set_base_advertised_feature_flags(feature_flags);
assert_eq!(global_ctx.get_feature_flags(), feature_flags);
}
#[tokio::test]
async fn set_base_advertised_feature_flags_keeps_disable_relay_data_effective() {
let config = TomlConfigLoader::default();
let global_ctx = GlobalCtx::new(config);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = true;
global_ctx.set_flags(flags);
let mut feature_flags = global_ctx.get_feature_flags();
feature_flags.avoid_relay_data = false;
feature_flags.is_public_server = true;
global_ctx.set_base_advertised_feature_flags(feature_flags);
let advertised_feature_flags = global_ctx.get_feature_flags();
assert!(advertised_feature_flags.avoid_relay_data);
assert!(advertised_feature_flags.is_public_server);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
let advertised_feature_flags = global_ctx.get_feature_flags();
assert!(!advertised_feature_flags.avoid_relay_data);
assert!(advertised_feature_flags.is_public_server);
}
#[tokio::test]
async fn disable_relay_data_sets_avoid_relay_feature_flag() {
let config = TomlConfigLoader::default();
let global_ctx = GlobalCtx::new(config);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = true;
global_ctx.set_flags(flags);
assert!(global_ctx.get_feature_flags().avoid_relay_data);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
assert!(!global_ctx.get_feature_flags().avoid_relay_data);
global_ctx.set_avoid_relay_data_preference(true);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = true;
global_ctx.set_flags(flags);
assert!(global_ctx.get_feature_flags().avoid_relay_data);
let mut flags = global_ctx.get_flags().clone();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
assert!(global_ctx.get_feature_flags().avoid_relay_data);
}
#[tokio::test] #[tokio::test]
async fn should_deny_proxy_for_process_wide_rpc_port() { async fn should_deny_proxy_for_process_wide_rpc_port() {
protected_port::clear_protected_tcp_ports_for_test(); protected_port::clear_protected_tcp_ports_for_test();
+5
View File
@@ -340,6 +340,11 @@ impl InstanceConfigPatcher {
global_ctx.set_ipv6(Some(ipv6.into())); global_ctx.set_ipv6(Some(ipv6.into()));
global_ctx.config.set_ipv6(Some(ipv6.into())); global_ctx.config.set_ipv6(Some(ipv6.into()));
} }
if let Some(disable_relay_data) = patch.disable_relay_data {
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = disable_relay_data;
global_ctx.set_flags(flags);
}
if let Some(enabled) = patch.ipv6_public_addr_provider { if let Some(enabled) = patch.ipv6_public_addr_provider {
global_ctx.config.set_ipv6_public_addr_provider(enabled); global_ctx.config.set_ipv6_public_addr_provider(enabled);
provider_config_changed = true; provider_config_changed = true;
+2 -10
View File
@@ -361,16 +361,8 @@ fn apply_public_ipv6_provider_runtime_state(
let prefix_changed = global_ctx.set_advertised_ipv6_public_addr_prefix(next_prefix); let prefix_changed = global_ctx.set_advertised_ipv6_public_addr_prefix(next_prefix);
let next_provider_enabled = matches!(state, PublicIpv6ProviderRuntimeState::Active(_)); let next_provider_enabled = matches!(state, PublicIpv6ProviderRuntimeState::Active(_));
let feature_changed = { let feature_changed =
let mut feature_flags = global_ctx.get_feature_flags(); global_ctx.set_ipv6_public_addr_provider_feature_flag(next_provider_enabled);
if feature_flags.ipv6_public_addr_provider == next_provider_enabled {
false
} else {
feature_flags.ipv6_public_addr_provider = next_provider_enabled;
global_ctx.set_feature_flags(feature_flags);
true
}
};
prefix_changed || feature_changed prefix_changed || feature_changed
} }
+5
View File
@@ -816,6 +816,10 @@ impl NetworkConfig {
flags.disable_upnp = disable_upnp; flags.disable_upnp = disable_upnp;
} }
if let Some(disable_relay_data) = self.disable_relay_data {
flags.disable_relay_data = disable_relay_data;
}
if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching { if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching {
flags.disable_sym_hole_punching = disable_sym_hole_punching; flags.disable_sym_hole_punching = disable_sym_hole_punching;
} }
@@ -990,6 +994,7 @@ impl NetworkConfig {
result.disable_tcp_hole_punching = Some(flags.disable_tcp_hole_punching); result.disable_tcp_hole_punching = Some(flags.disable_tcp_hole_punching);
result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching); result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching);
result.disable_upnp = Some(flags.disable_upnp); result.disable_upnp = Some(flags.disable_upnp);
result.disable_relay_data = Some(flags.disable_relay_data);
result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching); result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching);
result.enable_magic_dns = Some(flags.accept_dns); result.enable_magic_dns = Some(flags.accept_dns);
result.mtu = Some(flags.mtu as i32); result.mtu = Some(flags.mtu as i32);
+228 -10
View File
@@ -56,7 +56,7 @@ use super::{
route_trait::NextHopPolicy, route_trait::NextHopPolicy,
traffic_metrics::{ traffic_metrics::{
InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder, InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder,
route_peer_info_instance_id, traffic_kind, is_relay_data_packet_type, route_peer_info_instance_id, traffic_kind,
}, },
}; };
@@ -69,11 +69,16 @@ pub trait GlobalForeignNetworkAccessor: Send + Sync + 'static {
struct ForeignNetworkEntry { struct ForeignNetworkEntry {
my_peer_id: PeerId, my_peer_id: PeerId,
// Node-global runtime flags, such as disable_relay_data, live on the parent
// context. The foreign context is scoped to the foreign network's OSPF view.
parent_global_ctx: ArcGlobalCtx,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
network: NetworkIdentity, network: NetworkIdentity,
peer_map: Arc<PeerMap>, peer_map: Arc<PeerMap>,
relay_peer_map: Arc<RelayPeerMap>, relay_peer_map: Arc<RelayPeerMap>,
peer_session_store: Arc<PeerSessionStore>, peer_session_store: Arc<PeerSessionStore>,
// Static per-network permission from the whitelist check. disable_relay_data
// is the node-wide runtime override layered on top of this value.
relay_data: bool, relay_data: bool,
pm_packet_sender: Mutex<Option<PacketRecvChan>>, pm_packet_sender: Mutex<Option<PacketRecvChan>>,
@@ -205,6 +210,7 @@ impl ForeignNetworkEntry {
Self { Self {
my_peer_id, my_peer_id,
parent_global_ctx: global_ctx.clone(),
global_ctx: foreign_global_ctx, global_ctx: foreign_global_ctx,
network, network,
peer_map, peer_map,
@@ -231,6 +237,27 @@ impl ForeignNetworkEntry {
} }
} }
fn desired_avoid_relay_data_feature_flag(
parent_global_ctx: &ArcGlobalCtx,
relay_data: bool,
) -> bool {
!relay_data || parent_global_ctx.get_feature_flags().avoid_relay_data
}
fn sync_parent_relay_data_feature_flag(
parent_global_ctx: &ArcGlobalCtx,
global_ctx: &ArcGlobalCtx,
relay_data: bool,
) -> bool {
let avoid_relay_data =
Self::desired_avoid_relay_data_feature_flag(parent_global_ctx, relay_data);
if global_ctx.get_feature_flags().avoid_relay_data == avoid_relay_data {
return false;
}
global_ctx.set_avoid_relay_data_preference(avoid_relay_data)
}
fn build_foreign_global_ctx( fn build_foreign_global_ctx(
network: &NetworkIdentity, network: &NetworkIdentity,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
@@ -258,10 +285,9 @@ impl ForeignNetworkEntry {
let mut feature_flag = global_ctx.get_feature_flags(); let mut feature_flag = global_ctx.get_feature_flags();
feature_flag.is_public_server = true; feature_flag.is_public_server = true;
if !relay_data { feature_flag.avoid_relay_data =
feature_flag.avoid_relay_data = true; Self::desired_avoid_relay_data_feature_flag(&global_ctx, relay_data);
} foreign_global_ctx.set_base_advertised_feature_flags(feature_flag);
foreign_global_ctx.set_feature_flags(feature_flag);
for u in global_ctx.get_running_listeners().into_iter() { for u in global_ctx.get_running_listeners().into_iter() {
foreign_global_ctx.add_running_listener(u); foreign_global_ctx.add_running_listener(u);
@@ -412,6 +438,7 @@ impl ForeignNetworkEntry {
let peer_map = self.peer_map.clone(); let peer_map = self.peer_map.clone();
let relay_peer_map = self.relay_peer_map.clone(); let relay_peer_map = self.relay_peer_map.clone();
let traffic_metrics = self.traffic_metrics.clone(); let traffic_metrics = self.traffic_metrics.clone();
let parent_global_ctx = self.parent_global_ctx.clone();
let relay_data = self.relay_data; let relay_data = self.relay_data;
let pm_sender = self.pm_packet_sender.lock().await.take().unwrap(); let pm_sender = self.pm_packet_sender.lock().await.take().unwrap();
let network_name = self.network.network_name.clone(); let network_name = self.network.network_name.clone();
@@ -497,11 +524,16 @@ impl ForeignNetworkEntry {
"ignore packet in foreign network" "ignore packet in foreign network"
); );
} else { } else {
if packet_type == PacketType::Data as u8 if is_relay_data_packet_type(packet_type) {
|| packet_type == PacketType::KcpSrc as u8 let disable_relay_data = parent_global_ctx.flags_arc().disable_relay_data;
|| packet_type == PacketType::KcpDst as u8 if !relay_data || disable_relay_data {
{ tracing::debug!(
if !relay_data { ?from_peer_id,
?to_peer_id,
packet_type,
disable_relay_data,
"drop foreign network relay data"
);
continue; continue;
} }
if !bps_limiter.try_consume(len.into()) { if !bps_limiter.try_consume(len.into()) {
@@ -589,10 +621,31 @@ impl ForeignNetworkEntry {
}); });
} }
async fn run_parent_feature_flag_sync_routine(&self) {
let parent_global_ctx = self.parent_global_ctx.clone();
let global_ctx = self.global_ctx.clone();
let relay_data = self.relay_data;
self.tasks.lock().await.spawn(async move {
let mut parent_events = parent_global_ctx.subscribe();
loop {
ForeignNetworkEntry::sync_parent_relay_data_feature_flag(
&parent_global_ctx,
&global_ctx,
relay_data,
);
if parent_events.recv().await.is_err() {
parent_events = parent_global_ctx.subscribe();
}
}
});
}
async fn prepare(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) { async fn prepare(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
self.prepare_route(accessor).await; self.prepare_route(accessor).await;
self.start_packet_recv().await; self.start_packet_recv().await;
self.run_relay_session_gc_routine().await; self.run_relay_session_gc_routine().await;
self.run_parent_feature_flag_sync_routine().await;
self.peer_rpc.run(); self.peer_rpc.run();
self.peer_center.init().await; self.peer_center.init().await;
} }
@@ -1397,6 +1450,92 @@ pub mod tests {
); );
} }
#[tokio::test]
async fn disable_relay_data_blocks_foreign_network_transit_data() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pma_net1.clone(), pm_center.clone()).await;
connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await;
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
.await
.unwrap();
let mut flags = pm_center.get_global_ctx().get_flags();
flags.disable_relay_data = true;
pm_center.get_global_ctx().set_flags(flags);
pm_center
.get_global_ctx()
.issue_event(GlobalCtxEvent::ConfigPatched(Default::default()));
let center_peer_id = pm_center
.get_foreign_network_manager()
.get_network_peer_id("net1")
.unwrap();
wait_for_condition(
|| {
let pma_net1 = pma_net1.clone();
async move {
pma_net1.list_routes().await.iter().any(|route| {
route.peer_id == center_peer_id
&& route
.feature_flag
.as_ref()
.map(|flag| flag.avoid_relay_data)
.unwrap_or(false)
})
}
},
Duration::from_secs(5),
)
.await;
let network_labels =
LabelSet::new().with_label_type(LabelType::NetworkName("net1".to_string()));
let forwarded_bytes_before = metric_value(
&pm_center,
MetricName::TrafficBytesForwarded,
network_labels.clone(),
);
let forwarded_packets_before = metric_value(
&pm_center,
MetricName::TrafficPacketsForwarded,
network_labels.clone(),
);
let mut transit_pkt = ZCPacket::new_with_payload(b"foreign-transit-disabled");
transit_pkt.fill_peer_manager_hdr(
pma_net1.my_peer_id(),
pmb_net1.my_peer_id(),
PacketType::Data as u8,
);
pma_net1
.get_foreign_network_client()
.send_msg(transit_pkt, center_peer_id)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(
metric_value(
&pm_center,
MetricName::TrafficBytesForwarded,
network_labels.clone()
),
forwarded_bytes_before
);
assert_eq!(
metric_value(
&pm_center,
MetricName::TrafficPacketsForwarded,
network_labels
),
forwarded_packets_before
);
}
#[tokio::test] #[tokio::test]
async fn foreign_network_transit_control_forwarding_records_control_forwarded_metrics() { async fn foreign_network_transit_control_forwarding_records_control_forwarded_metrics() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
@@ -1409,6 +1548,10 @@ pub mod tests {
.await .await
.unwrap(); .unwrap();
let mut flags = pm_center.get_global_ctx().get_flags();
flags.disable_relay_data = true;
pm_center.get_global_ctx().set_flags(flags);
let center_peer_id = pm_center let center_peer_id = pm_center
.get_foreign_network_manager() .get_foreign_network_manager()
.get_network_peer_id("net1") .get_network_peer_id("net1")
@@ -1657,6 +1800,81 @@ pub mod tests {
)); ));
} }
#[tokio::test]
async fn foreign_entry_feature_flag_tracks_parent_disable_relay_data_toggle() {
let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"__access__".to_string(),
"access_secret".to_string(),
)));
let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string());
let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan();
let entry = ForeignNetworkEntry::new(
foreign_network,
1,
global_ctx.clone(),
true,
Arc::new(PeerSessionStore::new()),
pm_packet_sender,
);
assert!(!entry.global_ctx.get_feature_flags().avoid_relay_data);
entry.run_parent_feature_flag_sync_routine().await;
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = true;
global_ctx.set_flags(flags);
global_ctx.issue_event(GlobalCtxEvent::ConfigPatched(Default::default()));
wait_for_condition(
|| async { entry.global_ctx.get_feature_flags().avoid_relay_data },
Duration::from_secs(2),
)
.await;
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
global_ctx.issue_event(GlobalCtxEvent::ConfigPatched(Default::default()));
wait_for_condition(
|| async { !entry.global_ctx.get_feature_flags().avoid_relay_data },
Duration::from_secs(2),
)
.await;
}
#[tokio::test]
async fn foreign_entry_without_relay_data_keeps_avoid_feature_flag() {
let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"__access__".to_string(),
"access_secret".to_string(),
)));
let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string());
let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan();
let entry = ForeignNetworkEntry::new(
foreign_network,
1,
global_ctx.clone(),
false,
Arc::new(PeerSessionStore::new()),
pm_packet_sender,
);
assert!(entry.global_ctx.get_feature_flags().avoid_relay_data);
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
ForeignNetworkEntry::sync_parent_relay_data_feature_flag(
&global_ctx,
&entry.global_ctx,
entry.relay_data,
);
assert!(entry.global_ctx.get_feature_flags().avoid_relay_data);
}
#[test] #[test]
fn credential_trust_path_rejects_admin_identity() { fn credential_trust_path_rejects_admin_identity() {
assert!(ForeignNetworkManager::should_reject_credential_trust_path( assert!(ForeignNetworkManager::should_reject_credential_trust_path(
+136 -12
View File
@@ -38,7 +38,7 @@ use crate::{
route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface}, route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface},
traffic_metrics::{ traffic_metrics::{
InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder, InstanceLabelKind, LogicalTrafficMetrics, TrafficKind, TrafficMetricRecorder,
route_peer_info_instance_id, traffic_kind, is_relay_data_packet_type, route_peer_info_instance_id, traffic_kind,
}, },
}, },
proto::{ proto::{
@@ -263,9 +263,7 @@ impl PeerManager {
.is_err() .is_err()
{ {
// if local network is not in whitelist, avoid relay data when exist any other route path // if local network is not in whitelist, avoid relay data when exist any other route path
let mut f = global_ctx.get_feature_flags(); global_ctx.set_avoid_relay_data_preference(true);
f.avoid_relay_data = true;
global_ctx.set_feature_flags(f);
} }
let is_secure_mode_enabled = global_ctx let is_secure_mode_enabled = global_ctx
@@ -774,6 +772,7 @@ impl PeerManager {
my_peer_id: PeerId, my_peer_id: PeerId,
peer_map: &PeerMap, peer_map: &PeerMap,
foreign_network_mgr: &ForeignNetworkManager, foreign_network_mgr: &ForeignNetworkManager,
disable_relay_data: bool,
) -> Result<(), ZCPacket> { ) -> Result<(), ZCPacket> {
let pm_header = packet.peer_manager_header().unwrap(); let pm_header = packet.peer_manager_header().unwrap();
if pm_header.packet_type != PacketType::ForeignNetworkPacket as u8 { if pm_header.packet_type != PacketType::ForeignNetworkPacket as u8 {
@@ -783,6 +782,16 @@ impl PeerManager {
let from_peer_id = pm_header.from_peer_id.get(); let from_peer_id = pm_header.from_peer_id.get();
let to_peer_id = pm_header.to_peer_id.get(); let to_peer_id = pm_header.to_peer_id.get();
if disable_relay_data && Self::is_relay_data_zc_packet(&packet) {
tracing::debug!(
?from_peer_id,
?to_peer_id,
inner_packet_type = ?packet.foreign_network_inner_packet_type(),
"drop foreign network relay data while relay data is disabled"
);
return Ok(());
}
let foreign_hdr = packet.foreign_network_hdr().unwrap(); let foreign_hdr = packet.foreign_network_hdr().unwrap();
let foreign_network_name = foreign_hdr.get_network_name(packet.payload()); let foreign_network_name = foreign_hdr.get_network_name(packet.payload());
let foreign_peer_id = foreign_hdr.get_dst_peer_id(); let foreign_peer_id = foreign_hdr.get_dst_peer_id();
@@ -872,6 +881,29 @@ impl PeerManager {
} }
} }
fn is_relay_data_packet(packet_type: u8) -> bool {
is_relay_data_packet_type(packet_type)
}
fn is_relay_data_zc_packet(packet: &ZCPacket) -> bool {
let Some(hdr) = packet.peer_manager_header() else {
return false;
};
if hdr.packet_type == PacketType::ForeignNetworkPacket as u8 {
let inner_packet_type = packet.foreign_network_inner_packet_type();
if inner_packet_type.is_none() {
tracing::warn!(
?hdr,
"foreign network packet has unparseable inner peer manager header"
);
}
return inner_packet_type.is_none_or(Self::is_relay_data_packet);
}
Self::is_relay_data_packet(hdr.packet_type)
}
async fn start_peer_recv(&self) { async fn start_peer_recv(&self) {
let mut recv = self.packet_recv.lock().await.take().unwrap(); let mut recv = self.packet_recv.lock().await.take().unwrap();
let my_peer_id = self.my_peer_id; let my_peer_id = self.my_peer_id;
@@ -925,14 +957,21 @@ impl PeerManager {
self.tasks.lock().await.spawn(async move { self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv"); tracing::trace!("start_peer_recv");
while let Ok(ret) = recv_packet_from_chan(&mut recv).await { while let Ok(ret) = recv_packet_from_chan(&mut recv).await {
let Err(mut ret) = let disable_relay_data = global_ctx.flags_arc().disable_relay_data;
Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr) let Err(mut ret) = Self::try_handle_foreign_network_packet(
.await ret,
my_peer_id,
&peers,
&foreign_mgr,
disable_relay_data,
)
.await
else { else {
continue; continue;
}; };
let buf_len = ret.buf_len(); let buf_len = ret.buf_len();
let is_relay_data_packet = Self::is_relay_data_zc_packet(&ret);
let Some(hdr) = ret.mut_peer_manager_header() else { let Some(hdr) = ret.mut_peer_manager_header() else {
tracing::warn!(?ret, "invalid packet, skip"); tracing::warn!(?ret, "invalid packet, skip");
continue; continue;
@@ -944,6 +983,16 @@ impl PeerManager {
let packet_type = hdr.packet_type; let packet_type = hdr.packet_type;
let is_encrypted = hdr.is_encrypted(); let is_encrypted = hdr.is_encrypted();
if to_peer_id != my_peer_id { if to_peer_id != my_peer_id {
if disable_relay_data && is_relay_data_packet {
tracing::debug!(
?from_peer_id,
?to_peer_id,
packet_type,
"drop forwarded relay data while relay data is disabled"
);
continue;
}
if hdr.forward_counter > 7 { if hdr.forward_counter > 7 {
tracing::warn!(?hdr, "forward counter exceed, drop packet"); tracing::warn!(?hdr, "forward counter exceed, drop packet");
continue; continue;
@@ -2080,7 +2129,7 @@ mod tests {
}, },
}, },
proto::{ proto::{
common::{CompressionAlgoPb, NatType, PeerFeatureFlag}, common::{CompressionAlgoPb, NatType},
peer_rpc::SecureAuthLevel, peer_rpc::SecureAuthLevel,
}, },
tunnel::{ tunnel::{
@@ -2224,6 +2273,84 @@ mod tests {
assert_eq!(signal.version(), initial_version + 2); assert_eq!(signal.version(), initial_version + 2);
} }
#[test]
fn disable_relay_data_classifies_data_plane_packets_only() {
for packet_type in [
PacketType::Data,
PacketType::KcpSrc,
PacketType::KcpDst,
PacketType::QuicSrc,
PacketType::QuicDst,
PacketType::DataWithKcpSrcModified,
PacketType::DataWithQuicSrcModified,
PacketType::RelayHandshake,
PacketType::RelayHandshakeAck,
PacketType::ForeignNetworkPacket,
] {
assert!(PeerManager::is_relay_data_packet(packet_type as u8));
}
for packet_type in [
PacketType::RpcReq,
PacketType::RpcResp,
PacketType::Ping,
PacketType::Pong,
PacketType::HandShake,
PacketType::NoiseHandshakeMsg1,
PacketType::NoiseHandshakeMsg2,
PacketType::NoiseHandshakeMsg3,
] {
assert!(!PeerManager::is_relay_data_packet(packet_type as u8));
}
}
#[test]
fn disable_relay_data_inspects_foreign_network_inner_packet_type() {
let network_name = "net1".to_string();
let mut rpc_packet = ZCPacket::new_with_payload(b"rpc");
rpc_packet.fill_peer_manager_hdr(1, 2, PacketType::RpcReq as u8);
let mut foreign_rpc_packet =
ZCPacket::new_for_foreign_network(&network_name, 2, &rpc_packet);
foreign_rpc_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8);
assert_eq!(
foreign_rpc_packet.foreign_network_inner_packet_type(),
Some(PacketType::RpcReq as u8)
);
assert!(!PeerManager::is_relay_data_zc_packet(&foreign_rpc_packet));
let mut data_packet = ZCPacket::new_with_payload(b"data");
data_packet.fill_peer_manager_hdr(1, 2, PacketType::Data as u8);
let mut foreign_data_packet =
ZCPacket::new_for_foreign_network(&network_name, 2, &data_packet);
foreign_data_packet.fill_peer_manager_hdr(10, 20, PacketType::ForeignNetworkPacket as u8);
assert_eq!(
foreign_data_packet.foreign_network_inner_packet_type(),
Some(PacketType::Data as u8)
);
assert!(PeerManager::is_relay_data_zc_packet(&foreign_data_packet));
}
#[tokio::test]
async fn non_whitelisted_network_avoid_relay_survives_disable_relay_data_toggle() {
let global_ctx = get_mock_global_ctx();
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = true;
flags.relay_network_whitelist = "other-network".to_string();
global_ctx.set_flags(flags);
let (packet_send, _packet_recv) = create_packet_recv_chan();
let _peer_mgr = PeerManager::new(RouteAlgoType::Ospf, global_ctx.clone(), packet_send);
let mut flags = global_ctx.get_flags();
flags.disable_relay_data = false;
global_ctx.set_flags(flags);
assert!(global_ctx.get_feature_flags().avoid_relay_data);
}
#[tokio::test] #[tokio::test]
async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() { async fn send_msg_internal_does_not_record_tx_metrics_on_failed_delivery() {
let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
@@ -3121,10 +3248,7 @@ mod tests {
// when b's avoid_relay_data is true, a->c should route through d and e, cost is 3 // when b's avoid_relay_data is true, a->c should route through d and e, cost is 3
peer_mgr_b peer_mgr_b
.get_global_ctx() .get_global_ctx()
.set_feature_flags(PeerFeatureFlag { .set_avoid_relay_data_preference(true);
avoid_relay_data: true,
..Default::default()
});
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
if wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(3)) if wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(3))
.await .await
+7
View File
@@ -241,6 +241,13 @@ pub(crate) fn traffic_kind(packet_type: u8) -> TrafficKind {
} }
} }
pub(crate) fn is_relay_data_packet_type(packet_type: u8) -> bool {
traffic_kind(packet_type) == TrafficKind::Data
|| packet_type == PacketType::RelayHandshake as u8
|| packet_type == PacketType::RelayHandshakeAck as u8
|| packet_type == PacketType::ForeignNetworkPacket as u8
}
#[derive(Clone)] #[derive(Clone)]
struct TrafficMetricGroup { struct TrafficMetricGroup {
data: Arc<LogicalTrafficMetrics>, data: Arc<LogicalTrafficMetrics>,
+1
View File
@@ -27,6 +27,7 @@ message InstanceConfigPatch {
optional bool ipv6_public_addr_provider = 11; optional bool ipv6_public_addr_provider = 11;
optional bool ipv6_public_addr_auto = 12; optional bool ipv6_public_addr_auto = 12;
optional string ipv6_public_addr_prefix = 13; optional string ipv6_public_addr_prefix = 13;
optional bool disable_relay_data = 14;
} }
message PortForwardPatch { message PortForwardPatch {
+1
View File
@@ -99,6 +99,7 @@ message NetworkConfig {
optional bool ipv6_public_addr_provider = 62; optional bool ipv6_public_addr_provider = 62;
optional bool ipv6_public_addr_auto = 63; optional bool ipv6_public_addr_auto = 63;
optional string ipv6_public_addr_prefix = 64; optional string ipv6_public_addr_prefix = 64;
optional bool disable_relay_data = 65;
} }
message PortForwardConfig { message PortForwardConfig {
+1
View File
@@ -75,6 +75,7 @@ message FlagsInConfig {
bool need_p2p = 38; bool need_p2p = 38;
uint64 instance_recv_bps_limit = 39; uint64 instance_recv_bps_limit = 39;
bool disable_upnp = 40; bool disable_upnp = 40;
bool disable_relay_data = 41;
} }
message RpcDescriptor { message RpcDescriptor {
+3 -3
View File
@@ -501,10 +501,10 @@ async fn credential_relay_capability(#[case] allow_relay: bool) {
// Create admin node // Create admin node
let admin_config = create_admin_config("admin", Some("ns_adm"), "10.144.144.1", "fd00::1/64"); let admin_config = create_admin_config("admin", Some("ns_adm"), "10.144.144.1", "fd00::1/64");
let mut admin_inst = Instance::new(admin_config); let mut admin_inst = Instance::new(admin_config);
let mut ff = admin_inst.get_global_ctx().get_feature_flags();
// if cred c allow relay, we set admin inst avoid relay (if other same-cost path available, admin will not relay data) // if cred c allow relay, we set admin inst avoid relay (if other same-cost path available, admin will not relay data)
ff.avoid_relay_data = allow_relay; admin_inst
admin_inst.get_global_ctx().set_feature_flags(ff); .get_global_ctx()
.set_avoid_relay_data_preference(allow_relay);
admin_inst.run().await.unwrap(); admin_inst.run().await.unwrap();
let admin_peer_id = admin_inst.peer_id(); let admin_peer_id = admin_inst.peer_id();
+147
View File
@@ -3730,6 +3730,153 @@ pub async fn config_patch_test() {
drop_insts(insts).await; drop_insts(insts).await;
} }
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn config_patch_disable_relay_data_test() {
use crate::proto::api::config::InstanceConfigPatch;
let insts = init_three_node_ex(
"udp",
|cfg| {
cfg.set_ipv6(None);
cfg
},
false,
)
.await;
let relay_peer_id = insts[1].peer_id();
let dst_peer_id = insts[2].peer_id();
assert!(!insts[1].get_global_ctx().get_flags().disable_relay_data);
assert!(
!insts[1]
.get_global_ctx()
.get_feature_flags()
.avoid_relay_data
);
check_route_ex(
insts[0].get_peer_manager().list_routes().await,
dst_peer_id,
|route| {
assert_eq!(route.next_hop_peer_id, relay_peer_id);
true
},
);
wait_for_condition(
|| async { ping_test("net_a", "10.144.144.3", None).await },
Duration::from_secs(5),
)
.await;
insts[1]
.get_config_patcher()
.apply_patch(InstanceConfigPatch {
disable_relay_data: Some(true),
..Default::default()
})
.await
.unwrap();
assert!(insts[1].get_global_ctx().get_flags().disable_relay_data);
assert!(
insts[1]
.get_global_ctx()
.config
.get_flags()
.disable_relay_data
);
assert!(
insts[1]
.get_global_ctx()
.get_feature_flags()
.avoid_relay_data
);
wait_for_condition(
|| {
let peer_mgr = insts[0].get_peer_manager().clone();
async move {
peer_mgr.list_routes().await.iter().any(|route| {
route.peer_id == relay_peer_id
&& route
.feature_flag
.as_ref()
.map(|flag| flag.avoid_relay_data)
.unwrap_or(false)
})
}
},
Duration::from_secs(5),
)
.await;
check_route_ex(
insts[0].get_peer_manager().list_routes().await,
dst_peer_id,
|route| {
assert_eq!(route.next_hop_peer_id, relay_peer_id);
true
},
);
assert!(
!ping_test("net_a", "10.144.144.3", None).await,
"traffic from inst1 to inst3 should be blocked while inst2 relay data is disabled"
);
insts[1]
.get_config_patcher()
.apply_patch(InstanceConfigPatch {
disable_relay_data: Some(false),
..Default::default()
})
.await
.unwrap();
assert!(!insts[1].get_global_ctx().get_flags().disable_relay_data);
assert!(
!insts[1]
.get_global_ctx()
.config
.get_flags()
.disable_relay_data
);
assert!(
!insts[1]
.get_global_ctx()
.get_feature_flags()
.avoid_relay_data
);
wait_for_condition(
|| {
let peer_mgr = insts[0].get_peer_manager().clone();
async move {
peer_mgr.list_routes().await.iter().any(|route| {
route.peer_id == relay_peer_id
&& route
.feature_flag
.as_ref()
.map(|flag| !flag.avoid_relay_data)
.unwrap_or(false)
})
}
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| async { ping_test("net_a", "10.144.144.3", None).await },
Duration::from_secs(5),
)
.await;
drop_insts(insts).await;
}
/// Generate SecureModeConfig with specified x25519 private key /// Generate SecureModeConfig with specified x25519 private key
pub fn generate_secure_mode_config_with_key( pub fn generate_secure_mode_config_with_key(
private_key: &x25519_dalek::StaticSecret, private_key: &x25519_dalek::StaticSecret,
+11
View File
@@ -730,6 +730,17 @@ impl ZCPacket {
} }
} }
pub fn foreign_network_inner_packet_type(&self) -> Option<u8> {
if self.peer_manager_header()?.packet_type != PacketType::ForeignNetworkPacket as u8 {
return None;
}
let payload = self.payload();
let hdr = ForeignNetworkPacketHeader::ref_from_prefix(payload)?;
let inner_packet = payload.get(hdr.get_header_len()..)?;
PeerManagerHeader::ref_from_prefix(inner_packet).map(|hdr| hdr.packet_type)
}
pub fn foreign_network_packet(mut self) -> Self { pub fn foreign_network_packet(mut self) -> Self {
let hdr = self.foreign_network_hdr().unwrap(); let hdr = self.foreign_network_hdr().unwrap();
let foreign_hdr_len = hdr.get_header_len(); let foreign_hdr_len = hdr.get_header_len();