From 18478b7c4bf60944aab93f1a8fc407b17244f4ea Mon Sep 17 00:00:00 2001 From: Mg Pig Date: Tue, 30 Dec 2025 19:26:42 +0800 Subject: [PATCH] fix(android): update vpn routes when proxy cidrs change (#1717) --- easytier-gui/src-tauri/src/lib.rs | 11 +- easytier-gui/src/composables/event.ts | 17 ++ easytier-web/frontend-lib/src/locales/cn.yaml | 1 + easytier-web/frontend-lib/src/locales/en.yaml | 1 + .../frontend-lib/src/types/network.ts | 2 + easytier/src/common/global_ctx.rs | 2 + easytier/src/instance/instance.rs | 10 ++ easytier/src/instance/mod.rs | 2 + easytier/src/instance/proxy_cidrs_monitor.rs | 102 +++++++++++ easytier/src/instance/virtual_nic.rs | 167 +++++++++++------- easytier/src/instance_manager.rs | 10 ++ 11 files changed, 257 insertions(+), 68 deletions(-) create mode 100644 easytier/src/instance/proxy_cidrs_monitor.rs diff --git a/easytier-gui/src-tauri/src/lib.rs b/easytier-gui/src-tauri/src/lib.rs index 653006c7..1b1a7eaf 100644 --- a/easytier-gui/src-tauri/src/lib.rs +++ b/easytier-gui/src-tauri/src/lib.rs @@ -793,15 +793,18 @@ mod manager { tokio::spawn(async move { loop { match event_receiver.recv().await { - Ok(event) => { - if let easytier::common::global_ctx::GlobalCtxEvent::DhcpIpv4Changed(_, _) = event { - let _ = app_clone.emit("dhcp_ip_changed", instance_id_clone); - } + Ok(easytier::common::global_ctx::GlobalCtxEvent::DhcpIpv4Changed(_, _)) => { + let _ = app_clone.emit("dhcp_ip_changed", instance_id_clone); } + Ok(easytier::common::global_ctx::GlobalCtxEvent::ProxyCidrsUpdated(_, _)) => { + let _ = app_clone.emit("proxy_cidrs_updated", instance_id_clone); + } + Ok(_) => {} Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + let _ = app_clone.emit("event_lagged", instance_id_clone); event_receiver = event_receiver.resubscribe(); } } diff --git a/easytier-gui/src/composables/event.ts b/easytier-gui/src/composables/event.ts index 00353f6c..e4c45655 100644 --- a/easytier-gui/src/composables/event.ts +++ b/easytier-gui/src/composables/event.ts @@ -8,6 +8,8 @@ const EVENTS = Object.freeze({ POST_RUN_NETWORK_INSTANCE: 'post_run_network_instance', VPN_SERVICE_STOP: 'vpn_service_stop', DHCP_IP_CHANGED: 'dhcp_ip_changed', + PROXY_CIDRS_UPDATED: 'proxy_cidrs_updated', + EVENT_LAGGED: 'event_lagged', }); function onSaveConfigs(event: Event) { @@ -38,6 +40,19 @@ async function onDhcpIpChanged(event: Event) { } } +async function onProxyCidrsUpdated(event: Event) { + console.log(`Received event '${EVENTS.PROXY_CIDRS_UPDATED}' for instance: ${event.payload}`); + if (type() === 'android') { + await onNetworkInstanceChange(event.payload); + } +} + +async function onEventLagged(event: Event) { + if (type() === 'android') { + await onNetworkInstanceChange(event.payload); + } +} + export async function listenGlobalEvents() { const unlisteners = [ await listen(EVENTS.SAVE_CONFIGS, onSaveConfigs), @@ -45,6 +60,8 @@ export async function listenGlobalEvents() { await listen(EVENTS.POST_RUN_NETWORK_INSTANCE, onPostRunNetworkInstance), await listen(EVENTS.VPN_SERVICE_STOP, onVpnServiceStop), await listen(EVENTS.DHCP_IP_CHANGED, onDhcpIpChanged), + await listen(EVENTS.PROXY_CIDRS_UPDATED, onProxyCidrsUpdated), + await listen(EVENTS.EVENT_LAGGED, onEventLagged), ]; return () => { diff --git a/easytier-web/frontend-lib/src/locales/cn.yaml b/easytier-web/frontend-lib/src/locales/cn.yaml index ea8be33e..9de3ba25 100644 --- a/easytier-web/frontend-lib/src/locales/cn.yaml +++ b/easytier-web/frontend-lib/src/locales/cn.yaml @@ -228,6 +228,7 @@ event: DhcpIpv4Changed: DHCP IPv4地址更改 DhcpIpv4Conflicted: DHCP IPv4地址冲突 PortForwardAdded: 端口转发添加 + ProxyCidrsUpdated: 子网代理CIDR更新 web: login: diff --git a/easytier-web/frontend-lib/src/locales/en.yaml b/easytier-web/frontend-lib/src/locales/en.yaml index f4c918fe..ad66be73 100644 --- a/easytier-web/frontend-lib/src/locales/en.yaml +++ b/easytier-web/frontend-lib/src/locales/en.yaml @@ -228,6 +228,7 @@ event: DhcpIpv4Changed: DhcpIpv4Changed DhcpIpv4Conflicted: DhcpIpv4Conflicted PortForwardAdded: PortForwardAdded + ProxyCidrsUpdated: ProxyCidrsUpdated web: login: diff --git a/easytier-web/frontend-lib/src/types/network.ts b/easytier-web/frontend-lib/src/types/network.ts index 10fe8177..89b29017 100644 --- a/easytier-web/frontend-lib/src/types/network.ts +++ b/easytier-web/frontend-lib/src/types/network.ts @@ -313,4 +313,6 @@ export enum EventType { DhcpIpv4Conflicted = 'DhcpIpv4Conflicted', // ipv4 | null PortForwardAdded = 'PortForwardAdded', // PortForwardConfigPb + + ProxyCidrsUpdated = 'ProxyCidrsUpdated', // string[], string[] } diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index b5869f48..a35a9746 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -55,6 +55,8 @@ pub enum GlobalCtxEvent { PortForwardAdded(PortForwardConfigPb), ConfigPatched(InstanceConfigPatch), + + ProxyCidrsUpdated(Vec, Vec), // (added, removed) } pub type EventBus = tokio::sync::broadcast::Sender; diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index d4c404bc..8b6011e2 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -534,6 +534,8 @@ pub struct Instance { #[cfg(feature = "socks5")] socks5_server: Arc, + proxy_cidrs_monitor: Option>, + global_ctx: ArcGlobalCtx, } @@ -613,6 +615,8 @@ impl Instance { #[cfg(feature = "socks5")] socks5_server, + proxy_cidrs_monitor: None, + global_ctx, } } @@ -964,6 +968,12 @@ impl Instance { self.add_initial_peers().await?; + let monitor = super::proxy_cidrs_monitor::ProxyCidrsMonitor::new( + self.peer_manager.clone(), + self.global_ctx.clone(), + ); + self.proxy_cidrs_monitor = Some(monitor.start()); + if self.global_ctx.get_vpn_portal_cidr().is_some() { self.run_vpn_portal().await?; } diff --git a/easytier/src/instance/mod.rs b/easytier/src/instance/mod.rs index 82918ff3..d50ad23e 100644 --- a/easytier/src/instance/mod.rs +++ b/easytier/src/instance/mod.rs @@ -4,5 +4,7 @@ pub mod instance; pub mod listeners; +pub mod proxy_cidrs_monitor; + #[cfg(feature = "tun")] pub mod virtual_nic; diff --git a/easytier/src/instance/proxy_cidrs_monitor.rs b/easytier/src/instance/proxy_cidrs_monitor.rs new file mode 100644 index 00000000..ced62479 --- /dev/null +++ b/easytier/src/instance/proxy_cidrs_monitor.rs @@ -0,0 +1,102 @@ +use std::collections::BTreeSet; +use std::sync::{Arc, Weak}; +use std::time::Instant; + +use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtxEvent}; +use crate::common::scoped_task::ScopedTask; +use crate::peers::peer_manager::PeerManager; + +/// ProxyCidrsMonitor monitors changes in proxy CIDRs from peer routes +/// and emits GlobalCtxEvent::ProxyCidrsUpdated with added/removed diffs. +pub struct ProxyCidrsMonitor { + peer_mgr: Weak, + global_ctx: ArcGlobalCtx, +} + +impl ProxyCidrsMonitor { + pub fn new(peer_mgr: Arc, global_ctx: ArcGlobalCtx) -> Self { + Self { + peer_mgr: Arc::downgrade(&peer_mgr), + global_ctx, + } + } + + /// Collects current proxy_cidrs from peer routes, VPN portal config, and manual routes. + /// This is a static function that can be used for initial sync or recovery after Lagged errors. + pub async fn diff_proxy_cidrs( + peer_mgr: &PeerManager, + global_ctx: &ArcGlobalCtx, + cur_proxy_cidrs: &mut BTreeSet, + ) -> (Vec, Vec) { + // Collect proxy_cidrs from routes + let mut proxy_cidrs = BTreeSet::new(); + let routes = peer_mgr.list_routes().await; + for r in routes { + for cidr in r.proxy_cidrs { + let Ok(cidr) = cidr.parse::() else { + continue; + }; + proxy_cidrs.insert(cidr); + } + } + + // Add VPN portal cidr to proxy_cidrs + if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() { + proxy_cidrs.insert(vpn_cfg.client_cidr); + } + + // If has manual routes, override entire proxy_cidrs + if let Some(routes) = global_ctx.config.get_routes() { + proxy_cidrs = routes.into_iter().collect(); + } + + // Calculate diff + if cur_proxy_cidrs == &proxy_cidrs { + return (Vec::new(), Vec::new()); + } + let added: Vec = proxy_cidrs.difference(cur_proxy_cidrs).cloned().collect(); + let removed: Vec = + cur_proxy_cidrs.difference(&proxy_cidrs).cloned().collect(); + + *cur_proxy_cidrs = proxy_cidrs; + + (added, removed) + } + + /// Starts monitoring proxy_cidrs changes and emits events with diffs + pub fn start(self) -> ScopedTask<()> { + ScopedTask::from(tokio::spawn(async move { + let mut cur_proxy_cidrs = BTreeSet::new(); + let mut last_update = None::; + + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + tracing::warn!("peer manager dropped, stopping ProxyCidrsMonitor"); + break; + }; + + // Check if route info has been updated + let last_update_time = peer_mgr.get_route_peer_info_last_update_time().await; + if last_update == Some(last_update_time) { + continue; + } + last_update = Some(last_update_time); + + let (added, removed) = Self::diff_proxy_cidrs( + peer_mgr.as_ref(), + &self.global_ctx, + &mut cur_proxy_cidrs, + ) + .await; + + if added.is_empty() && removed.is_empty() { + continue; + } + self.global_ctx + .issue_event(GlobalCtxEvent::ProxyCidrsUpdated(added, removed)); + } + })) + } +} diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index 525f6912..b7ee2aed 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -13,6 +13,7 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, ifcfg::{IfConfiger, IfConfiguerTrait}, }, + instance::proxy_cidrs_monitor::ProxyCidrsMonitor, peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver}, tunnel::{ common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes}, @@ -825,6 +826,57 @@ impl NicCtx { }); } + async fn apply_route_changes( + ifcfg: &impl IfConfiguerTrait, + ifname: &str, + net_ns: &crate::common::netns::NetNS, + cur_proxy_cidrs: &mut BTreeSet, + added: Vec, + removed: Vec, + ) { + tracing::debug!(?added, ?removed, "applying proxy_cidrs route changes"); + + // Remove routes + for cidr in removed { + if !cur_proxy_cidrs.contains(&cidr) { + continue; + } + let _g = net_ns.guard(); + let ret = ifcfg + .remove_ipv4_route(ifname, cidr.first_address(), cidr.network_length()) + .await; + + if ret.is_err() { + tracing::trace!( + cidr = ?cidr, + err = ?ret, + "remove route failed.", + ); + } + cur_proxy_cidrs.remove(&cidr); + } + + // Add routes + for cidr in added { + if cur_proxy_cidrs.contains(&cidr) { + continue; + } + let _g = net_ns.guard(); + let ret = ifcfg + .add_ipv4_route(ifname, cidr.first_address(), cidr.network_length(), None) + .await; + + if ret.is_err() { + tracing::trace!( + cidr = ?cidr, + err = ?ret, + "add route failed.", + ); + } + cur_proxy_cidrs.insert(cidr); + } + } + async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> { let Some(peer_mgr) = self.peer_mgr.upgrade() else { return Err(anyhow::anyhow!("peer manager not available").into()); @@ -834,79 +886,66 @@ impl NicCtx { let nic = self.nic.lock().await; let ifcfg = nic.get_ifcfg(); let ifname = nic.ifname().to_owned(); + let mut event_receiver = global_ctx.subscribe(); self.tasks.spawn(async move { - let mut cur_proxy_cidrs = BTreeSet::new(); + let mut cur_proxy_cidrs = BTreeSet::::new(); + + // Initial sync: get current proxy_cidrs state and apply routes + let (added, removed) = ProxyCidrsMonitor::diff_proxy_cidrs( + peer_mgr.as_ref(), + &global_ctx, + &mut cur_proxy_cidrs, + ) + .await; + Self::apply_route_changes( + &ifcfg, + &ifname, + &net_ns, + &mut cur_proxy_cidrs, + added, + removed, + ) + .await; + loop { - let mut proxy_cidrs = BTreeSet::new(); - let routes = peer_mgr.list_routes().await; - for r in routes { - for cidr in r.proxy_cidrs { - let Ok(cidr) = cidr.parse::() else { - continue; - }; - proxy_cidrs.insert(cidr); + let event = match event_receiver.recv().await { + Ok(event) => event, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::debug!("event bus closed, stopping proxy_cidrs route updater"); + break; } - } - // add vpn portal cidr to proxy_cidrs - if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() { - proxy_cidrs.insert(vpn_cfg.client_cidr); - } - - if let Some(routes) = global_ctx.config.get_routes() { - // if has manual routes, just override entire proxy_cidrs - proxy_cidrs = routes.into_iter().collect(); - } - - // if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it. - for cidr in cur_proxy_cidrs.iter() { - if proxy_cidrs.contains(cidr) { - continue; - } - - let _g = net_ns.guard(); - let ret = ifcfg - .remove_ipv4_route( - ifname.as_str(), - cidr.first_address(), - cidr.network_length(), + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!( + "event bus lagged in proxy_cidrs route updater, doing full sync" + ); + event_receiver = event_receiver.resubscribe(); + // Full sync after lagged to recover consistent state + let (added, removed) = ProxyCidrsMonitor::diff_proxy_cidrs( + peer_mgr.as_ref(), + &global_ctx, + &mut cur_proxy_cidrs, ) .await; - - if ret.is_err() { - tracing::trace!( - cidr = ?cidr, - err = ?ret, - "remove route failed.", - ); + GlobalCtxEvent::ProxyCidrsUpdated(added, removed) } - } + }; - for cidr in proxy_cidrs.iter() { - if cur_proxy_cidrs.contains(cidr) { - continue; - } - let _g = net_ns.guard(); - let ret = ifcfg - .add_ipv4_route( - ifname.as_str(), - cidr.first_address(), - cidr.network_length(), - None, - ) - .await; + // Only handle ProxyCidrsUpdated events + let (added, removed) = match event { + GlobalCtxEvent::ProxyCidrsUpdated(added, removed) => (added, removed), + _ => continue, + }; - if ret.is_err() { - tracing::trace!( - cidr = ?cidr, - err = ?ret, - "add route failed.", - ); - } - } - - cur_proxy_cidrs = proxy_cidrs; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Self::apply_route_changes( + &ifcfg, + &ifname, + &net_ns, + &mut cur_proxy_cidrs, + added, + removed, + ) + .await; } }); diff --git a/easytier/src/instance_manager.rs b/easytier/src/instance_manager.rs index 059fecb6..e9934606 100644 --- a/easytier/src/instance_manager.rs +++ b/easytier/src/instance_manager.rs @@ -414,6 +414,16 @@ fn handle_event( GlobalCtxEvent::ConfigPatched(patch) => { print_event(instance_id, format!("config patched. patch: {:?}", patch)); } + + GlobalCtxEvent::ProxyCidrsUpdated(added, removed) => { + print_event( + instance_id, + format!( + "proxy CIDRs updated. added: {:?}, removed: {:?}", + added, removed + ), + ); + } } } else { events = events.resubscribe();