Implement requesting tun_fd with tokio channel. (#1734)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Burning_TNT
2026-01-04 21:04:43 +08:00
committed by GitHub
parent ee5227130c
commit 7f48ca54a3
+25 -13
View File
@@ -16,16 +16,20 @@ use crate::{
}; };
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use std::net::SocketAddr;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
sync::{atomic::AtomicBool, Arc, RwLock}, net::SocketAddr,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
};
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
}; };
use tokio::{sync::broadcast, task::JoinSet};
pub type MyNodeInfo = crate::proto::api::manage::MyNodeInfo; pub type MyNodeInfo = crate::proto::api::manage::MyNodeInfo;
type ArcMutApiService = Arc<RwLock<Option<Arc<dyn InstanceRpcService>>>>; type ArcMutApiService = Arc<RwLock<Option<Arc<dyn InstanceRpcService>>>>;
type TunFd = Option<i32>;
#[derive(serde::Serialize, Clone)] #[derive(serde::Serialize, Clone)]
pub struct Event { pub struct Event {
@@ -35,7 +39,7 @@ pub struct Event {
struct EasyTierData { struct EasyTierData {
events: RwLock<VecDeque<Event>>, events: RwLock<VecDeque<Event>>,
tun_fd: Arc<RwLock<Option<i32>>>, tun_fd: (mpsc::Sender<TunFd>, Mutex<Option<mpsc::Receiver<TunFd>>>),
event_subscriber: RwLock<broadcast::Sender<GlobalCtxEvent>>, event_subscriber: RwLock<broadcast::Sender<GlobalCtxEvent>>,
instance_stop_notifier: Arc<tokio::sync::Notify>, instance_stop_notifier: Arc<tokio::sync::Notify>,
} }
@@ -43,10 +47,11 @@ struct EasyTierData {
impl Default for EasyTierData { impl Default for EasyTierData {
fn default() -> Self { fn default() -> Self {
let (tx, _) = broadcast::channel(16); let (tx, _) = broadcast::channel(16);
let (sender, receiver) = mpsc::channel(16);
Self { Self {
event_subscriber: RwLock::new(tx), event_subscriber: RwLock::new(tx),
events: RwLock::new(VecDeque::new()), events: RwLock::new(VecDeque::new()),
tun_fd: Arc::new(RwLock::new(None)), tun_fd: (sender, Mutex::new(Some(receiver))),
instance_stop_notifier: Arc::new(tokio::sync::Notify::new()), instance_stop_notifier: Arc::new(tokio::sync::Notify::new()),
} }
} }
@@ -98,24 +103,25 @@ impl EasyTierLauncher {
let peer_mgr = instance.get_peer_manager(); let peer_mgr = instance.get_peer_manager();
let nic_ctx = instance.get_nic_ctx(); let nic_ctx = instance.get_nic_ctx();
let peer_packet_receiver = instance.get_peer_packet_receiver(); let peer_packet_receiver = instance.get_peer_packet_receiver();
let arc_tun_fd = data.tun_fd.clone(); let mut tun_fd_receiver = data.tun_fd.1.lock().unwrap().take().unwrap();
tasks.spawn(async move { tasks.spawn(async move {
let mut old_tun_fd = arc_tun_fd.read().unwrap().clone(); let mut old_tun_fd = None;
loop { loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; let Some(tun_fd) = tun_fd_receiver.recv().await.flatten() else {
let tun_fd = arc_tun_fd.read().unwrap().clone(); return;
if tun_fd != old_tun_fd && tun_fd.is_some() { };
if Some(tun_fd) != old_tun_fd {
let res = Instance::setup_nic_ctx_for_android( let res = Instance::setup_nic_ctx_for_android(
nic_ctx.clone(), nic_ctx.clone(),
global_ctx.clone(), global_ctx.clone(),
peer_mgr.clone(), peer_mgr.clone(),
peer_packet_receiver.clone(), peer_packet_receiver.clone(),
tun_fd.unwrap(), tun_fd,
) )
.await; .await;
if res.is_ok() { if res.is_ok() {
old_tun_fd = tun_fd; old_tun_fd = Some(tun_fd);
} }
} }
} }
@@ -395,10 +401,16 @@ impl NetworkInstance {
pub fn set_tun_fd(&mut self, tun_fd: i32) { pub fn set_tun_fd(&mut self, tun_fd: i32) {
if let Some(launcher) = self.launcher.as_ref() { if let Some(launcher) = self.launcher.as_ref() {
launcher.data.tun_fd.write().unwrap().replace(tun_fd); let _ = launcher.data.tun_fd.0.blocking_send(Some(tun_fd));
} }
} }
pub fn get_tun_fd_sender(&self) -> Option<mpsc::Sender<TunFd>> {
self.launcher
.as_ref()
.map(|launcher| launcher.data.tun_fd.0.clone())
}
pub fn start(&mut self) -> Result<EventBusSubscriber, anyhow::Error> { pub fn start(&mut self) -> Result<EventBusSubscriber, anyhow::Error> {
if self.is_easytier_running() { if self.is_easytier_running() {
return Ok(self.subscribe_event().unwrap()); return Ok(self.subscribe_event().unwrap());