mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 02:09:06 +00:00
fix(android): update vpn routes when proxy cidrs change (#1717)
This commit is contained in:
@@ -55,6 +55,8 @@ pub enum GlobalCtxEvent {
|
||||
PortForwardAdded(PortForwardConfigPb),
|
||||
|
||||
ConfigPatched(InstanceConfigPatch),
|
||||
|
||||
ProxyCidrsUpdated(Vec<cidr::Ipv4Cidr>, Vec<cidr::Ipv4Cidr>), // (added, removed)
|
||||
}
|
||||
|
||||
pub type EventBus = tokio::sync::broadcast::Sender<GlobalCtxEvent>;
|
||||
|
||||
@@ -534,6 +534,8 @@ pub struct Instance {
|
||||
#[cfg(feature = "socks5")]
|
||||
socks5_server: Arc<Socks5Server>,
|
||||
|
||||
proxy_cidrs_monitor: Option<ScopedTask<()>>,
|
||||
|
||||
global_ctx: ArcGlobalCtx,
|
||||
}
|
||||
|
||||
@@ -613,6 +615,8 @@ impl Instance {
|
||||
#[cfg(feature = "socks5")]
|
||||
socks5_server,
|
||||
|
||||
proxy_cidrs_monitor: None,
|
||||
|
||||
global_ctx,
|
||||
}
|
||||
}
|
||||
@@ -964,6 +968,12 @@ impl Instance {
|
||||
|
||||
self.add_initial_peers().await?;
|
||||
|
||||
let monitor = super::proxy_cidrs_monitor::ProxyCidrsMonitor::new(
|
||||
self.peer_manager.clone(),
|
||||
self.global_ctx.clone(),
|
||||
);
|
||||
self.proxy_cidrs_monitor = Some(monitor.start());
|
||||
|
||||
if self.global_ctx.get_vpn_portal_cidr().is_some() {
|
||||
self.run_vpn_portal().await?;
|
||||
}
|
||||
|
||||
@@ -4,5 +4,7 @@ pub mod instance;
|
||||
|
||||
pub mod listeners;
|
||||
|
||||
pub mod proxy_cidrs_monitor;
|
||||
|
||||
#[cfg(feature = "tun")]
|
||||
pub mod virtual_nic;
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtxEvent};
|
||||
use crate::common::scoped_task::ScopedTask;
|
||||
use crate::peers::peer_manager::PeerManager;
|
||||
|
||||
/// ProxyCidrsMonitor monitors changes in proxy CIDRs from peer routes
|
||||
/// and emits GlobalCtxEvent::ProxyCidrsUpdated with added/removed diffs.
|
||||
pub struct ProxyCidrsMonitor {
|
||||
peer_mgr: Weak<PeerManager>,
|
||||
global_ctx: ArcGlobalCtx,
|
||||
}
|
||||
|
||||
impl ProxyCidrsMonitor {
|
||||
pub fn new(peer_mgr: Arc<PeerManager>, global_ctx: ArcGlobalCtx) -> Self {
|
||||
Self {
|
||||
peer_mgr: Arc::downgrade(&peer_mgr),
|
||||
global_ctx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Collects current proxy_cidrs from peer routes, VPN portal config, and manual routes.
|
||||
/// This is a static function that can be used for initial sync or recovery after Lagged errors.
|
||||
pub async fn diff_proxy_cidrs(
|
||||
peer_mgr: &PeerManager,
|
||||
global_ctx: &ArcGlobalCtx,
|
||||
cur_proxy_cidrs: &mut BTreeSet<cidr::Ipv4Cidr>,
|
||||
) -> (Vec<cidr::Ipv4Cidr>, Vec<cidr::Ipv4Cidr>) {
|
||||
// Collect proxy_cidrs from routes
|
||||
let mut proxy_cidrs = BTreeSet::new();
|
||||
let routes = peer_mgr.list_routes().await;
|
||||
for r in routes {
|
||||
for cidr in r.proxy_cidrs {
|
||||
let Ok(cidr) = cidr.parse::<cidr::Ipv4Cidr>() else {
|
||||
continue;
|
||||
};
|
||||
proxy_cidrs.insert(cidr);
|
||||
}
|
||||
}
|
||||
|
||||
// Add VPN portal cidr to proxy_cidrs
|
||||
if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() {
|
||||
proxy_cidrs.insert(vpn_cfg.client_cidr);
|
||||
}
|
||||
|
||||
// If has manual routes, override entire proxy_cidrs
|
||||
if let Some(routes) = global_ctx.config.get_routes() {
|
||||
proxy_cidrs = routes.into_iter().collect();
|
||||
}
|
||||
|
||||
// Calculate diff
|
||||
if cur_proxy_cidrs == &proxy_cidrs {
|
||||
return (Vec::new(), Vec::new());
|
||||
}
|
||||
let added: Vec<cidr::Ipv4Cidr> = proxy_cidrs.difference(cur_proxy_cidrs).cloned().collect();
|
||||
let removed: Vec<cidr::Ipv4Cidr> =
|
||||
cur_proxy_cidrs.difference(&proxy_cidrs).cloned().collect();
|
||||
|
||||
*cur_proxy_cidrs = proxy_cidrs;
|
||||
|
||||
(added, removed)
|
||||
}
|
||||
|
||||
/// Starts monitoring proxy_cidrs changes and emits events with diffs
|
||||
pub fn start(self) -> ScopedTask<()> {
|
||||
ScopedTask::from(tokio::spawn(async move {
|
||||
let mut cur_proxy_cidrs = BTreeSet::new();
|
||||
let mut last_update = None::<Instant>;
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let Some(peer_mgr) = self.peer_mgr.upgrade() else {
|
||||
tracing::warn!("peer manager dropped, stopping ProxyCidrsMonitor");
|
||||
break;
|
||||
};
|
||||
|
||||
// Check if route info has been updated
|
||||
let last_update_time = peer_mgr.get_route_peer_info_last_update_time().await;
|
||||
if last_update == Some(last_update_time) {
|
||||
continue;
|
||||
}
|
||||
last_update = Some(last_update_time);
|
||||
|
||||
let (added, removed) = Self::diff_proxy_cidrs(
|
||||
peer_mgr.as_ref(),
|
||||
&self.global_ctx,
|
||||
&mut cur_proxy_cidrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
if added.is_empty() && removed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
self.global_ctx
|
||||
.issue_event(GlobalCtxEvent::ProxyCidrsUpdated(added, removed));
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ use crate::{
|
||||
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
|
||||
ifcfg::{IfConfiger, IfConfiguerTrait},
|
||||
},
|
||||
instance::proxy_cidrs_monitor::ProxyCidrsMonitor,
|
||||
peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver},
|
||||
tunnel::{
|
||||
common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes},
|
||||
@@ -825,6 +826,57 @@ impl NicCtx {
|
||||
});
|
||||
}
|
||||
|
||||
async fn apply_route_changes(
|
||||
ifcfg: &impl IfConfiguerTrait,
|
||||
ifname: &str,
|
||||
net_ns: &crate::common::netns::NetNS,
|
||||
cur_proxy_cidrs: &mut BTreeSet<cidr::Ipv4Cidr>,
|
||||
added: Vec<cidr::Ipv4Cidr>,
|
||||
removed: Vec<cidr::Ipv4Cidr>,
|
||||
) {
|
||||
tracing::debug!(?added, ?removed, "applying proxy_cidrs route changes");
|
||||
|
||||
// Remove routes
|
||||
for cidr in removed {
|
||||
if !cur_proxy_cidrs.contains(&cidr) {
|
||||
continue;
|
||||
}
|
||||
let _g = net_ns.guard();
|
||||
let ret = ifcfg
|
||||
.remove_ipv4_route(ifname, cidr.first_address(), cidr.network_length())
|
||||
.await;
|
||||
|
||||
if ret.is_err() {
|
||||
tracing::trace!(
|
||||
cidr = ?cidr,
|
||||
err = ?ret,
|
||||
"remove route failed.",
|
||||
);
|
||||
}
|
||||
cur_proxy_cidrs.remove(&cidr);
|
||||
}
|
||||
|
||||
// Add routes
|
||||
for cidr in added {
|
||||
if cur_proxy_cidrs.contains(&cidr) {
|
||||
continue;
|
||||
}
|
||||
let _g = net_ns.guard();
|
||||
let ret = ifcfg
|
||||
.add_ipv4_route(ifname, cidr.first_address(), cidr.network_length(), None)
|
||||
.await;
|
||||
|
||||
if ret.is_err() {
|
||||
tracing::trace!(
|
||||
cidr = ?cidr,
|
||||
err = ?ret,
|
||||
"add route failed.",
|
||||
);
|
||||
}
|
||||
cur_proxy_cidrs.insert(cidr);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> {
|
||||
let Some(peer_mgr) = self.peer_mgr.upgrade() else {
|
||||
return Err(anyhow::anyhow!("peer manager not available").into());
|
||||
@@ -834,79 +886,66 @@ impl NicCtx {
|
||||
let nic = self.nic.lock().await;
|
||||
let ifcfg = nic.get_ifcfg();
|
||||
let ifname = nic.ifname().to_owned();
|
||||
let mut event_receiver = global_ctx.subscribe();
|
||||
|
||||
self.tasks.spawn(async move {
|
||||
let mut cur_proxy_cidrs = BTreeSet::new();
|
||||
let mut cur_proxy_cidrs = BTreeSet::<cidr::Ipv4Cidr>::new();
|
||||
|
||||
// Initial sync: get current proxy_cidrs state and apply routes
|
||||
let (added, removed) = ProxyCidrsMonitor::diff_proxy_cidrs(
|
||||
peer_mgr.as_ref(),
|
||||
&global_ctx,
|
||||
&mut cur_proxy_cidrs,
|
||||
)
|
||||
.await;
|
||||
Self::apply_route_changes(
|
||||
&ifcfg,
|
||||
&ifname,
|
||||
&net_ns,
|
||||
&mut cur_proxy_cidrs,
|
||||
added,
|
||||
removed,
|
||||
)
|
||||
.await;
|
||||
|
||||
loop {
|
||||
let mut proxy_cidrs = BTreeSet::new();
|
||||
let routes = peer_mgr.list_routes().await;
|
||||
for r in routes {
|
||||
for cidr in r.proxy_cidrs {
|
||||
let Ok(cidr) = cidr.parse::<cidr::Ipv4Cidr>() else {
|
||||
continue;
|
||||
};
|
||||
proxy_cidrs.insert(cidr);
|
||||
let event = match event_receiver.recv().await {
|
||||
Ok(event) => event,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
tracing::debug!("event bus closed, stopping proxy_cidrs route updater");
|
||||
break;
|
||||
}
|
||||
}
|
||||
// add vpn portal cidr to proxy_cidrs
|
||||
if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() {
|
||||
proxy_cidrs.insert(vpn_cfg.client_cidr);
|
||||
}
|
||||
|
||||
if let Some(routes) = global_ctx.config.get_routes() {
|
||||
// if has manual routes, just override entire proxy_cidrs
|
||||
proxy_cidrs = routes.into_iter().collect();
|
||||
}
|
||||
|
||||
// if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it.
|
||||
for cidr in cur_proxy_cidrs.iter() {
|
||||
if proxy_cidrs.contains(cidr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let _g = net_ns.guard();
|
||||
let ret = ifcfg
|
||||
.remove_ipv4_route(
|
||||
ifname.as_str(),
|
||||
cidr.first_address(),
|
||||
cidr.network_length(),
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
tracing::warn!(
|
||||
"event bus lagged in proxy_cidrs route updater, doing full sync"
|
||||
);
|
||||
event_receiver = event_receiver.resubscribe();
|
||||
// Full sync after lagged to recover consistent state
|
||||
let (added, removed) = ProxyCidrsMonitor::diff_proxy_cidrs(
|
||||
peer_mgr.as_ref(),
|
||||
&global_ctx,
|
||||
&mut cur_proxy_cidrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
if ret.is_err() {
|
||||
tracing::trace!(
|
||||
cidr = ?cidr,
|
||||
err = ?ret,
|
||||
"remove route failed.",
|
||||
);
|
||||
GlobalCtxEvent::ProxyCidrsUpdated(added, removed)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for cidr in proxy_cidrs.iter() {
|
||||
if cur_proxy_cidrs.contains(cidr) {
|
||||
continue;
|
||||
}
|
||||
let _g = net_ns.guard();
|
||||
let ret = ifcfg
|
||||
.add_ipv4_route(
|
||||
ifname.as_str(),
|
||||
cidr.first_address(),
|
||||
cidr.network_length(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
// Only handle ProxyCidrsUpdated events
|
||||
let (added, removed) = match event {
|
||||
GlobalCtxEvent::ProxyCidrsUpdated(added, removed) => (added, removed),
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
if ret.is_err() {
|
||||
tracing::trace!(
|
||||
cidr = ?cidr,
|
||||
err = ?ret,
|
||||
"add route failed.",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
cur_proxy_cidrs = proxy_cidrs;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
Self::apply_route_changes(
|
||||
&ifcfg,
|
||||
&ifname,
|
||||
&net_ns,
|
||||
&mut cur_proxy_cidrs,
|
||||
added,
|
||||
removed,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -414,6 +414,16 @@ fn handle_event(
|
||||
GlobalCtxEvent::ConfigPatched(patch) => {
|
||||
print_event(instance_id, format!("config patched. patch: {:?}", patch));
|
||||
}
|
||||
|
||||
GlobalCtxEvent::ProxyCidrsUpdated(added, removed) => {
|
||||
print_event(
|
||||
instance_id,
|
||||
format!(
|
||||
"proxy CIDRs updated. added: {:?}, removed: {:?}",
|
||||
added, removed
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
events = events.resubscribe();
|
||||
|
||||
Reference in New Issue
Block a user