mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-10 07:55:36 +00:00
[OHOS] feat: Enhance Rust kernel with config management and routing improvements (#2227)
* [OHOS.with ai] 将配置管理/配置分享/路由聚合/实例状态解析下沉至 Rust 内核,收敛职责并提升性能 (#2209) * feat: add ohrs config store and startup error logging * feat: full ability core for ohos * feat: full ability core for ohos * feat: clean code --------- Co-authored-by: FrankHan <frankhan@FrankHans-Mac-mini.local> * fix: 添加缺失文件 * fix: 修复更新路由启动两次TUN问题,并调整日志 * fix: rustfmt * fix: 适配Cidr忽略/32格式路由 * fix: 修复Option适配错误 * fix: rustfmt * fix: rustfmt --------- Co-authored-by: FrankHan <frankhan@FrankHans-Mac-mini.local>
This commit is contained in:
@@ -0,0 +1,196 @@
|
||||
use super::protocol::{TunRequestPayload, broadcast_local_socket_message};
|
||||
use crate::config::repository::kernel_socket_path;
|
||||
use crate::get_runtime_snapshot_inner;
|
||||
use crate::kernel_bridge::routing::aggregate_tun_routes;
|
||||
use ohos_hilog_binding::{hilog_error, hilog_info};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::ErrorKind;
|
||||
use std::os::unix::net::{UnixListener, UnixStream};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
struct LocalSocketState {
|
||||
stop_flag: std::sync::Arc<AtomicBool>,
|
||||
socket_path: PathBuf,
|
||||
worker: JoinHandle<()>,
|
||||
}
|
||||
|
||||
static LOCAL_SOCKET_STATE: Lazy<Mutex<Option<LocalSocketState>>> = Lazy::new(|| Mutex::new(None));
|
||||
|
||||
pub fn start_local_socket_server() -> bool {
|
||||
let socket_path = match kernel_socket_path() {
|
||||
Some(path) => path,
|
||||
None => {
|
||||
hilog_error!("[Rust] kernel socket path unavailable");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
match LOCAL_SOCKET_STATE.lock() {
|
||||
Ok(guard) if guard.is_some() => return true,
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] lock localsocket state failed: {}", err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if socket_path.exists() {
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
let listener = match UnixListener::bind(&socket_path) {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
hilog_error!(
|
||||
"[Rust] bind localsocket failed {}: {}",
|
||||
socket_path.display(),
|
||||
err
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
if let Err(err) = listener.set_nonblocking(true) {
|
||||
hilog_error!("[Rust] set localsocket nonblocking failed: {}", err);
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
let stop_flag = std::sync::Arc::new(AtomicBool::new(false));
|
||||
let worker_stop_flag = stop_flag.clone();
|
||||
let worker = thread::spawn(move || {
|
||||
let mut last_snapshot_json = String::new();
|
||||
let mut delivered_tun_requests = HashSet::new();
|
||||
let mut last_tun_route_signatures = HashMap::<String, String>::new();
|
||||
let mut clients = Vec::<UnixStream>::new();
|
||||
|
||||
while !worker_stop_flag.load(Ordering::Relaxed) {
|
||||
let mut accepted_client = false;
|
||||
loop {
|
||||
match listener.accept() {
|
||||
Ok((stream, _addr)) => {
|
||||
accepted_client = true;
|
||||
clients.push(stream);
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::WouldBlock => break,
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] accept localsocket failed: {}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let snapshot = get_runtime_snapshot_inner();
|
||||
let snapshot_json = match serde_json::to_string(&snapshot) {
|
||||
Ok(json) => json,
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] serialize runtime snapshot failed: {}", err);
|
||||
thread::sleep(Duration::from_millis(250));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if accepted_client || snapshot_json != last_snapshot_json {
|
||||
let _ = broadcast_local_socket_message(
|
||||
&mut clients,
|
||||
"runtime_snapshot",
|
||||
&snapshot_json,
|
||||
);
|
||||
last_snapshot_json = snapshot_json;
|
||||
}
|
||||
|
||||
for instance in snapshot.instances.iter() {
|
||||
if instance.running && instance.tun_required {
|
||||
let virtual_ipv4 = instance
|
||||
.my_node_info
|
||||
.as_ref()
|
||||
.and_then(|info| info.virtual_ipv4.clone());
|
||||
let virtual_ipv4_cidr = instance
|
||||
.my_node_info
|
||||
.as_ref()
|
||||
.and_then(|info| info.virtual_ipv4_cidr.clone());
|
||||
if clients.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if virtual_ipv4.is_none() || virtual_ipv4_cidr.is_none() {
|
||||
continue;
|
||||
}
|
||||
let aggregated_routes = aggregate_tun_routes(instance);
|
||||
let route_signature = serde_json::to_string(&aggregated_routes)
|
||||
.unwrap_or_else(|_| "[]".to_string());
|
||||
let should_send = !delivered_tun_requests.contains(&instance.instance_id)
|
||||
|| last_tun_route_signatures
|
||||
.get(&instance.instance_id)
|
||||
.map(|value| value != &route_signature)
|
||||
.unwrap_or(true);
|
||||
if !should_send {
|
||||
continue;
|
||||
}
|
||||
let payload = TunRequestPayload {
|
||||
config_id: instance.config_id.clone(),
|
||||
instance_id: instance.instance_id.clone(),
|
||||
display_name: instance.display_name.clone(),
|
||||
virtual_ipv4,
|
||||
virtual_ipv4_cidr,
|
||||
aggregated_routes,
|
||||
magic_dns_enabled: instance.magic_dns_enabled,
|
||||
need_exit_node: instance.need_exit_node,
|
||||
};
|
||||
let payload_json = match serde_json::to_string(&payload) {
|
||||
Ok(json) => json,
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] serialize tun request failed: {}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if broadcast_local_socket_message(&mut clients, "tun_request", &payload_json) {
|
||||
delivered_tun_requests.insert(instance.instance_id.clone());
|
||||
last_tun_route_signatures
|
||||
.insert(instance.instance_id.clone(), route_signature);
|
||||
}
|
||||
} else {
|
||||
delivered_tun_requests.remove(&instance.instance_id);
|
||||
last_tun_route_signatures.remove(&instance.instance_id);
|
||||
}
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(250));
|
||||
}
|
||||
});
|
||||
|
||||
match LOCAL_SOCKET_STATE.lock() {
|
||||
Ok(mut guard) => {
|
||||
*guard = Some(LocalSocketState {
|
||||
stop_flag,
|
||||
socket_path,
|
||||
worker,
|
||||
});
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] lock localsocket state failed: {}", err);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop_local_socket_server() -> bool {
|
||||
let state = match LOCAL_SOCKET_STATE.lock() {
|
||||
Ok(mut guard) => guard.take(),
|
||||
Err(err) => {
|
||||
hilog_error!("[Rust] lock localsocket state failed: {}", err);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(state) = state {
|
||||
state.stop_flag.store(true, Ordering::Relaxed);
|
||||
let _ = state.worker.join();
|
||||
let _ = std::fs::remove_file(state.socket_path);
|
||||
}
|
||||
true
|
||||
}
|
||||
Reference in New Issue
Block a user