use super::protocol::{TunRequestPayload, broadcast_local_socket_message}; use super::routing::aggregate_tun_routes; use crate::config::repository::kernel_socket_path; use crate::get_runtime_snapshot_inner; use ohos_hilog_binding::hilog_error; use once_cell::sync::Lazy; use std::collections::{HashMap, HashSet}; use std::io::ErrorKind; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; use std::sync::Mutex; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{self, JoinHandle}; use std::time::Duration; struct LocalSocketState { stop_flag: std::sync::Arc, socket_path: PathBuf, worker: JoinHandle<()>, } static LOCAL_SOCKET_STATE: Lazy>> = Lazy::new(|| Mutex::new(None)); pub fn start_local_socket_server() -> bool { let socket_path = match kernel_socket_path() { Some(path) => path, None => { hilog_error!("[Rust] kernel socket path unavailable"); return false; } }; match LOCAL_SOCKET_STATE.lock() { Ok(guard) if guard.is_some() => return true, Ok(_) => {} Err(err) => { hilog_error!("[Rust] lock localsocket state failed: {}", err); return false; } } if socket_path.exists() { let _ = std::fs::remove_file(&socket_path); } let listener = match UnixListener::bind(&socket_path) { Ok(listener) => listener, Err(err) => { hilog_error!("[Rust] bind localsocket failed {}: {}", socket_path.display(), err); return false; } }; if let Err(err) = listener.set_nonblocking(true) { hilog_error!("[Rust] set localsocket nonblocking failed: {}", err); let _ = std::fs::remove_file(&socket_path); return false; } let stop_flag = std::sync::Arc::new(AtomicBool::new(false)); let worker_stop_flag = stop_flag.clone(); let worker = thread::spawn(move || { let mut last_snapshot_json = String::new(); let mut delivered_tun_requests = HashSet::new(); let mut last_tun_route_signatures = HashMap::::new(); let mut clients = Vec::::new(); while !worker_stop_flag.load(Ordering::Relaxed) { let mut accepted_client = false; loop { match listener.accept() { Ok((stream, _addr)) => { accepted_client = true; clients.push(stream); } Err(err) if err.kind() == ErrorKind::WouldBlock => break, Err(err) => { hilog_error!("[Rust] accept localsocket failed: {}", err); break; } } } let snapshot = get_runtime_snapshot_inner(); let snapshot_json = match serde_json::to_string(&snapshot) { Ok(json) => json, Err(err) => { hilog_error!("[Rust] serialize runtime snapshot failed: {}", err); thread::sleep(Duration::from_millis(250)); continue; } }; if accepted_client || snapshot_json != last_snapshot_json { let _ = broadcast_local_socket_message(&mut clients, "runtime_snapshot", &snapshot_json); last_snapshot_json = snapshot_json; } for instance in snapshot.instances.iter() { if instance.running && instance.tun_required { let virtual_ipv4 = instance .my_node_info .as_ref() .and_then(|info| info.virtual_ipv4.clone()); let virtual_ipv4_cidr = instance .my_node_info .as_ref() .and_then(|info| info.virtual_ipv4_cidr.clone()); if clients.is_empty() { continue; } if virtual_ipv4.is_none() || virtual_ipv4_cidr.is_none() { continue; } let aggregated_routes = aggregate_tun_routes(instance); let route_signature = serde_json::to_string(&aggregated_routes) .unwrap_or_else(|_| "[]".to_string()); let should_send = !delivered_tun_requests.contains(&instance.instance_id) || last_tun_route_signatures .get(&instance.instance_id) .map(|value| value != &route_signature) .unwrap_or(true); if !should_send { continue; } let payload = TunRequestPayload { config_id: instance.config_id.clone(), instance_id: instance.instance_id.clone(), display_name: instance.display_name.clone(), virtual_ipv4, virtual_ipv4_cidr, aggregated_routes, magic_dns_enabled: instance.magic_dns_enabled, need_exit_node: instance.need_exit_node, }; let payload_json = match serde_json::to_string(&payload) { Ok(json) => json, Err(err) => { hilog_error!("[Rust] serialize tun request failed: {}", err); continue; } }; if broadcast_local_socket_message(&mut clients, "tun_request", &payload_json) { delivered_tun_requests.insert(instance.instance_id.clone()); last_tun_route_signatures.insert(instance.instance_id.clone(), route_signature); } } else { delivered_tun_requests.remove(&instance.instance_id); last_tun_route_signatures.remove(&instance.instance_id); } } thread::sleep(Duration::from_millis(250)); } }); match LOCAL_SOCKET_STATE.lock() { Ok(mut guard) => { *guard = Some(LocalSocketState { stop_flag, socket_path, worker, }); true } Err(err) => { hilog_error!("[Rust] lock localsocket state failed: {}", err); false } } } pub fn stop_local_socket_server() -> bool { let state = match LOCAL_SOCKET_STATE.lock() { Ok(mut guard) => guard.take(), Err(err) => { hilog_error!("[Rust] lock localsocket state failed: {}", err); return false; } }; if let Some(state) = state { state.stop_flag.store(true, Ordering::Relaxed); let _ = state.worker.join(); let _ = std::fs::remove_file(state.socket_path); } true }