From 2bfdd447592cfe343713a0c6c7c9398c05a9a1f7 Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Sat, 21 Mar 2026 21:06:07 +0800 Subject: [PATCH] multi_fix: harden peer/session handling, tighten foreign-network trust, and improve web client metadata (#1999) * machine-id should be scoped unbder same user-id * feat: report device os metadata to console * fix sync root key cause packet loss * fix tun packet not invalid * fix faketcp cause lat jitter * fix some packet not decrypt * fix peer info patch, improve performance of update self info * fix foreign credential identity mismatch handling --- easytier-web/src/client_manager/mod.rs | 26 +- easytier-web/src/client_manager/session.rs | 11 + easytier-web/src/client_manager/storage.rs | 54 +- easytier-web/src/restful/mod.rs | 8 +- easytier-web/src/restful/network.rs | 31 +- easytier-web/src/restful/rpc.rs | 8 +- easytier-web/src/webhook.rs | 9 + easytier/src/common/global_ctx.rs | 56 +- easytier/src/common/mod.rs | 1 + easytier/src/common/os_info.rs | 144 ++++ easytier/src/peers/foreign_network_manager.rs | 178 ++++- easytier/src/peers/peer.rs | 110 ++- easytier/src/peers/peer_conn.rs | 39 +- easytier/src/peers/peer_manager.rs | 23 +- easytier/src/peers/peer_map.rs | 15 +- easytier/src/peers/peer_ospf_route.rs | 724 +++++++++++++----- easytier/src/peers/peer_session.rs | 226 +++++- easytier/src/peers/route_trait.rs | 6 + easytier/src/proto/web.proto | 7 + easytier/src/tunnel/fake_tcp/mod.rs | 18 +- .../tunnel/fake_tcp/netfilter/linux_bpf.rs | 25 +- easytier/src/web_client/controller.rs | 13 +- easytier/src/web_client/mod.rs | 5 +- easytier/src/web_client/session.rs | 2 + 24 files changed, 1381 insertions(+), 358 deletions(-) create mode 100644 easytier/src/common/os_info.rs diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs index 69e9355f..074a5631 100644 --- a/easytier-web/src/client_manager/mod.rs +++ b/easytier-web/src/client_manager/mod.rs @@ -175,27 +175,15 @@ impl ClientManager { .map(|item| item.value().clone()) } - /// Find a session by machine_id regardless of user_id. - pub fn get_session_by_machine_id_global( + pub async fn disconnect_session_by_machine_id( &self, + user_id: UserIdInDb, machine_id: &uuid::Uuid, - ) -> Option> { - self.storage - .get_client_url_by_machine_id_global(machine_id) - .and_then(|url| { - self.client_sessions - .get(&url) - .map(|item| item.value().clone()) - }) - } - - /// Get user_id associated with a machine_id. - pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { - self.storage.get_user_id_by_machine_id_global(machine_id) - } - - pub async fn disconnect_session_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> bool { - let Some(client_url) = self.storage.get_client_url_by_machine_id_global(machine_id) else { + ) -> bool { + let Some(client_url) = self + .storage + .get_client_url_by_machine_id(user_id, machine_id) + else { return false; }; let Some((_, session)) = self.client_sessions.remove(&client_url) else { diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs index 9c695fe1..4675c6de 100644 --- a/easytier-web/src/client_manager/session.rs +++ b/easytier-web/src/client_manager/session.rs @@ -88,12 +88,16 @@ impl Drop for SessionData { if self.webhook_config.is_enabled() { let webhook = self.webhook_config.clone(); let machine_id = token.machine_id.to_string(); + let user_id = Some(token.user_id); + let token_value = token.token.clone(); let web_instance_id = webhook.web_instance_id.clone(); let binding_version = self.binding_version; tokio::spawn(async move { webhook .notify_node_disconnected(&crate::webhook::NodeDisconnectedRequest { machine_id, + token: token_value, + user_id, web_instance_id, binding_version, }) @@ -190,6 +194,9 @@ impl SessionRpcService { machine_id: machine_id.to_string(), hostname: req.hostname.clone(), version: req.easytier_version.clone(), + os_type: req.device_os.as_ref().map(|info| info.os_type.clone()), + os_version: req.device_os.as_ref().map(|info| info.version.clone()), + os_distribution: req.device_os.as_ref().map(|info| info.distribution.clone()), web_instance_id: data.webhook_config.web_instance_id.clone(), web_instance_api_base_url: data.webhook_config.web_instance_api_base_url.clone(), }; @@ -283,8 +290,12 @@ impl SessionRpcService { let connect_req = crate::webhook::NodeConnectedRequest { machine_id: machine_id.to_string(), token: req.user_token.clone(), + user_id: Some(user_id), hostname: req.hostname.clone(), version: req.easytier_version.clone(), + os_type: req.device_os.as_ref().map(|info| info.os_type.clone()), + os_version: req.device_os.as_ref().map(|info| info.version.clone()), + os_distribution: req.device_os.as_ref().map(|info| info.distribution.clone()), web_instance_id: webhook.web_instance_id.clone(), binding_version, }; diff --git a/easytier-web/src/client_manager/storage.rs b/easytier-web/src/client_manager/storage.rs index 7e63d740..73291986 100644 --- a/easytier-web/src/client_manager/storage.rs +++ b/easytier-web/src/client_manager/storage.rs @@ -22,7 +22,6 @@ struct ClientInfo { #[derive(Debug)] pub struct StorageInner { user_clients_map: DashMap>, - global_machine_map: DashMap, pub db: Db, } @@ -42,7 +41,6 @@ impl Storage { pub fn new(db: Db) -> Self { Storage(Arc::new(StorageInner { user_clients_map: DashMap::new(), - global_machine_map: DashMap::new(), db, })) } @@ -75,13 +73,10 @@ impl Storage { storage_token: stoken.clone(), report_time, }; - Self::update_client_info_map(&inner, &client_info); - Self::update_client_info_map(&self.0.global_machine_map, &client_info); } pub fn remove_client(&self, stoken: &StorageToken) { - Self::remove_client_info_map(&self.0.global_machine_map, stoken); self.0 .user_clients_map .remove_if(&stoken.user_id, |_, set| { @@ -106,22 +101,6 @@ impl Storage { }) } - /// Find client_url by machine_id across all users. - pub fn get_client_url_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { - self.0 - .global_machine_map - .get(machine_id) - .map(|info| info.storage_token.client_url.clone()) - } - - /// Find user_id by machine_id across all users. - pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { - self.0 - .global_machine_map - .get(machine_id) - .map(|info| info.storage_token.user_id) - } - pub fn list_user_clients(&self, user_id: UserIdInDb) -> Vec { self.0 .user_clients_map @@ -164,38 +143,35 @@ mod tests { } #[tokio::test] - async fn global_machine_index_uses_latest_report_and_ignores_stale_removal() { + async fn machine_id_is_scoped_within_each_user() { let storage = Storage::new(Db::memory_db().await); let machine_id = uuid::Uuid::new_v4(); - let old_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1001"); - let new_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1002"); + let user1_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1001"); + let user2_token = make_storage_token(2, machine_id, "tcp://127.0.0.1:1002"); - storage.update_client(old_token.clone(), 10); - storage.update_client(new_token.clone(), 20); + storage.update_client(user1_token.clone(), 10); + storage.update_client(user2_token.clone(), 20); assert_eq!( - storage.get_client_url_by_machine_id_global(&machine_id), - Some(new_token.client_url.clone()) + storage.get_client_url_by_machine_id(1, &machine_id), + Some(user1_token.client_url.clone()) ); assert_eq!( - storage.get_user_id_by_machine_id_global(&machine_id), - Some(1) + storage.get_client_url_by_machine_id(2, &machine_id), + Some(user2_token.client_url.clone()) ); - storage.remove_client(&old_token); + storage.remove_client(&user1_token); + assert_eq!(storage.get_client_url_by_machine_id(1, &machine_id), None); assert_eq!( - storage.get_client_url_by_machine_id_global(&machine_id), - Some(new_token.client_url.clone()) + storage.get_client_url_by_machine_id(2, &machine_id), + Some(user2_token.client_url.clone()) ); - storage.remove_client(&new_token); + storage.remove_client(&user2_token); - assert_eq!( - storage.get_client_url_by_machine_id_global(&machine_id), - None - ); - assert_eq!(storage.get_user_id_by_machine_id_global(&machine_id), None); + assert_eq!(storage.get_client_url_by_machine_id(2, &machine_id), None); } } diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs index d87cf438..9fd2bc43 100644 --- a/easytier-web/src/restful/mod.rs +++ b/easytier-web/src/restful/mod.rs @@ -31,7 +31,7 @@ use users::{AuthSession, Backend}; use crate::client_manager::storage::StorageToken; use crate::client_manager::ClientManager; -use crate::db::Db; +use crate::db::{Db, UserIdInDb}; use crate::webhook::SharedWebhookConfig; use crate::FeatureFlags; @@ -252,7 +252,7 @@ impl RestfulServer { get(Self::handle_list_all_sessions_internal), ) .route( - "/api/internal/sessions/:machine-id", + "/api/internal/users/:user-id/sessions/:machine-id", delete(Self::handle_disconnect_session_internal), ) .merge(NetworkApi::build_route_internal()) @@ -315,11 +315,11 @@ impl RestfulServer { } async fn handle_disconnect_session_internal( - Path(machine_id): Path, + Path((user_id, machine_id)): Path<(UserIdInDb, uuid::Uuid)>, State(client_mgr): AppState, ) -> Result { if client_mgr - .disconnect_session_by_machine_id_global(&machine_id) + .disconnect_session_by_machine_id(user_id, &machine_id) .await { Ok(StatusCode::NO_CONTENT) diff --git a/easytier-web/src/restful/network.rs b/easytier-web/src/restful/network.rs index 9fdf772f..e97e010b 100644 --- a/easytier-web/src/restful/network.rs +++ b/easytier-web/src/restful/network.rs @@ -299,10 +299,9 @@ impl NetworkApi { async fn handle_run_network_instance_internal( State(client_mgr): AppState, - Path(machine_id): Path, + Path((user_id, machine_id)): Path<(UserIdInDb, uuid::Uuid)>, Json(payload): Json, ) -> Result, HttpHandleError> { - let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; client_mgr .handle_run_network_instance((user_id, machine_id), payload.config, payload.save) .await @@ -312,9 +311,8 @@ impl NetworkApi { async fn handle_remove_network_instance_internal( State(client_mgr): AppState, - Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + Path((user_id, machine_id, inst_id)): Path<(UserIdInDb, uuid::Uuid, uuid::Uuid)>, ) -> Result<(), HttpHandleError> { - let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; client_mgr .handle_remove_network_instances((user_id, machine_id), vec![inst_id]) .await @@ -323,9 +321,8 @@ impl NetworkApi { async fn handle_list_network_instance_ids_internal( State(client_mgr): AppState, - Path(machine_id): Path, + Path((user_id, machine_id)): Path<(UserIdInDb, uuid::Uuid)>, ) -> Result, HttpHandleError> { - let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; Ok(client_mgr .handle_list_network_instance_ids((user_id, machine_id)) .await @@ -335,10 +332,9 @@ impl NetworkApi { async fn handle_collect_network_info_internal( State(client_mgr): AppState, - Path(machine_id): Path, + Path((user_id, machine_id)): Path<(UserIdInDb, uuid::Uuid)>, Json(payload): Json, ) -> Result, HttpHandleError> { - let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; Ok(client_mgr .handle_collect_network_info((user_id, machine_id), payload.inst_ids) .await @@ -346,32 +342,19 @@ impl NetworkApi { .into()) } - /// Look up user_id from a machine's active session token. - fn get_user_id_from_machine( - client_mgr: &AppStateInner, - machine_id: &uuid::Uuid, - ) -> Result { - client_mgr - .get_user_id_by_machine_id_global(machine_id) - .ok_or(( - StatusCode::NOT_FOUND, - other_error("Machine not found").into(), - )) - } - pub fn build_route_internal() -> Router { Router::new() .route( - "/api/internal/machines/:machine-id/networks", + "/api/internal/users/:user-id/machines/:machine-id/networks", post(Self::handle_run_network_instance_internal) .get(Self::handle_list_network_instance_ids_internal), ) .route( - "/api/internal/machines/:machine-id/networks/:inst-id", + "/api/internal/users/:user-id/machines/:machine-id/networks/:inst-id", delete(Self::handle_remove_network_instance_internal), ) .route( - "/api/internal/machines/:machine-id/networks/info", + "/api/internal/users/:user-id/machines/:machine-id/networks/info", get(Self::handle_collect_network_info_internal), ) } diff --git a/easytier-web/src/restful/rpc.rs b/easytier-web/src/restful/rpc.rs index 458bc642..7d2ec032 100644 --- a/easytier-web/src/restful/rpc.rs +++ b/easytier-web/src/restful/rpc.rs @@ -7,6 +7,8 @@ use axum::{ use axum_login::AuthUser as _; use easytier::proto::rpc_types::controller::BaseController; +use crate::db::UserIdInDb; + use super::{other_error, AppState, HttpHandleError}; #[derive(Debug, serde::Deserialize)] @@ -162,11 +164,11 @@ pub fn router() -> Router { /// Internal proxy-rpc handler: no AuthSession, resolves the active session by machine_id. pub async fn handle_proxy_rpc_internal( State(client_mgr): AppState, - Path(machine_id): Path, + Path((user_id, machine_id)): Path<(UserIdInDb, uuid::Uuid)>, Json(req): Json, ) -> Result, HttpHandleError> { let session = client_mgr - .get_session_by_machine_id_global(&machine_id) + .get_session_by_machine_id(user_id, &machine_id) .ok_or(( StatusCode::NOT_FOUND, other_error("Session not found").into(), @@ -176,7 +178,7 @@ pub async fn handle_proxy_rpc_internal( pub fn router_internal() -> Router { Router::new().route( - "/api/internal/machines/:machine-id/proxy-rpc", + "/api/internal/users/:user-id/machines/:machine-id/proxy-rpc", post(handle_proxy_rpc_internal), ) } diff --git a/easytier-web/src/webhook.rs b/easytier-web/src/webhook.rs index 86315512..29ad5043 100644 --- a/easytier-web/src/webhook.rs +++ b/easytier-web/src/webhook.rs @@ -51,6 +51,9 @@ pub struct ValidateTokenRequest { pub machine_id: String, pub hostname: String, pub version: String, + pub os_type: Option, + pub os_version: Option, + pub os_distribution: Option, pub web_instance_id: Option, pub web_instance_api_base_url: Option, } @@ -69,8 +72,12 @@ pub struct ValidateTokenResponse { pub struct NodeConnectedRequest { pub machine_id: String, pub token: String, + pub user_id: Option, pub hostname: String, pub version: String, + pub os_type: Option, + pub os_version: Option, + pub os_distribution: Option, pub web_instance_id: Option, pub binding_version: Option, } @@ -78,6 +85,8 @@ pub struct NodeConnectedRequest { #[derive(Debug, Serialize)] pub struct NodeDisconnectedRequest { pub machine_id: String, + pub token: String, + pub user_id: Option, pub web_instance_id: Option, pub binding_version: Option, } diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index bb1a6558..dd291e9c 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -134,6 +134,15 @@ impl TrustedKeyMapManager { } pub fn verify_trusted_key(&self, pubkey: &[u8], network_name: &str) -> bool { + self.verify_trusted_key_with_source(pubkey, network_name, None) + } + + pub fn verify_trusted_key_with_source( + &self, + pubkey: &[u8], + network_name: &str, + source: Option, + ) -> bool { let Some(trusted_keys) = self .network_trusted_keys .get(network_name) @@ -146,7 +155,11 @@ impl TrustedKeyMapManager { return false; }; - !metadata.is_expired() + if let Some(source) = source { + metadata.source == source && !metadata.is_expired() + } else { + !metadata.is_expired() + } } pub fn list_trusted_keys(&self, network_name: &str) -> Vec<(Vec, TrustedKeyMetadata)> { @@ -542,6 +555,16 @@ impl GlobalCtx { false } + pub fn is_pubkey_trusted_with_source( + &self, + pubkey: &[u8], + network_name: &str, + source: TrustedKeySource, + ) -> bool { + self.trusted_keys + .verify_trusted_key_with_source(pubkey, network_name, Some(source)) + } + /// Atomically replace all OSPF trusted keys with a new set /// Called by OSPF route layer after each route update pub fn update_trusted_keys(&self, keys: TrustedKeyMap, network_name: &str) { @@ -676,6 +699,37 @@ pub mod tests { ); } + #[tokio::test] + async fn trusted_key_source_lookup_is_precise() { + let config = TomlConfigLoader::default(); + let global_ctx = GlobalCtx::new(config); + let network_name = "net1"; + let pubkey = vec![1; 32]; + + global_ctx.update_trusted_keys( + HashMap::from([( + pubkey.clone(), + TrustedKeyMetadata { + source: TrustedKeySource::OspfCredential, + expiry_unix: None, + }, + )]), + network_name, + ); + + assert!(global_ctx.is_pubkey_trusted(&pubkey, network_name)); + assert!(!global_ctx.is_pubkey_trusted_with_source( + &pubkey, + network_name, + TrustedKeySource::OspfNode, + )); + assert!(global_ctx.is_pubkey_trusted_with_source( + &pubkey, + network_name, + TrustedKeySource::OspfCredential, + )); + } + pub fn get_mock_global_ctx_with_network( network_identy: Option, ) -> ArcGlobalCtx { diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index a7bbbbaa..86607a54 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -24,6 +24,7 @@ pub mod ifcfg; pub mod log; pub mod netns; pub mod network; +pub mod os_info; pub mod scoped_task; pub mod stats_manager; pub mod stun; diff --git a/easytier/src/common/os_info.rs b/easytier/src/common/os_info.rs new file mode 100644 index 00000000..dcd78a38 --- /dev/null +++ b/easytier/src/common/os_info.rs @@ -0,0 +1,144 @@ +use std::{collections::HashMap, fs, process::Command}; + +use crate::proto::web::DeviceOsInfo; + +pub fn collect_device_os_info() -> DeviceOsInfo { + let os_type = normalize_os_type(std::env::consts::OS); + let (version, distribution) = detect_os_version_and_distribution(&os_type); + + DeviceOsInfo { + os_type, + version, + distribution, + } +} + +fn normalize_os_type(raw: &str) -> String { + match raw { + "macos" => "macos".to_string(), + "windows" => "windows".to_string(), + "linux" => "linux".to_string(), + "android" => "android".to_string(), + "ios" => "ios".to_string(), + "freebsd" => "freebsd".to_string(), + other => other.to_string(), + } +} + +fn detect_os_version_and_distribution(os_type: &str) -> (String, String) { + match os_type { + "linux" | "android" => linux_version_and_distribution(os_type), + "macos" => ( + first_non_empty([ + command_output("sw_vers", &["-productVersion"]), + unix_kernel_release(), + ]), + "macOS".to_string(), + ), + "windows" => ( + first_non_empty([windows_version(), None]), + "Windows".to_string(), + ), + "freebsd" => ( + first_non_empty([ + command_output("freebsd-version", &[]), + unix_kernel_release(), + ]), + "FreeBSD".to_string(), + ), + other => ( + unix_kernel_release().unwrap_or_else(|| "unknown".to_string()), + other.to_string(), + ), + } +} + +fn linux_version_and_distribution(os_type: &str) -> (String, String) { + let os_release = parse_os_release().unwrap_or_default(); + let version = first_non_empty([ + os_release.get("VERSION_ID").cloned(), + os_release.get("VERSION").cloned(), + unix_kernel_release(), + ]); + let distribution = first_non_empty([ + os_release.get("NAME").cloned(), + os_release.get("ID").cloned().map(title_case), + Some(if os_type == "android" { + "Android".to_string() + } else { + "Linux".to_string() + }), + ]); + (version, distribution) +} + +fn parse_os_release() -> Option> { + ["/etc/os-release", "/usr/lib/os-release"] + .into_iter() + .find_map(|path| fs::read_to_string(path).ok()) + .map(|content| { + content + .lines() + .filter_map(|line| { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + return None; + } + let (key, value) = line.split_once('=')?; + Some((key.to_string(), trim_os_release_value(value))) + }) + .collect() + }) +} + +fn trim_os_release_value(value: &str) -> String { + value + .trim() + .trim_matches('"') + .trim_matches('\'') + .to_string() +} + +fn unix_kernel_release() -> Option { + command_output("uname", &["-r"]) +} + +fn windows_version() -> Option { + let output = command_output("cmd", &["/C", "ver"])?; + output + .split("Version") + .nth(1) + .map(str::trim) + .map(|part| part.trim_matches(&['[', ']'][..]).to_string()) + .filter(|value| !value.is_empty()) +} + +fn command_output(program: &str, args: &[&str]) -> Option { + let output = Command::new(program).args(args).output().ok()?; + if !output.status.success() { + return None; + } + let value = String::from_utf8(output.stdout).ok()?; + let value = value.trim(); + if value.is_empty() { + None + } else { + Some(value.to_string()) + } +} + +fn first_non_empty(values: [Option; N]) -> String { + values + .into_iter() + .flatten() + .find(|value| !value.trim().is_empty()) + .unwrap_or_else(|| "unknown".to_string()) +} + +fn title_case(value: String) -> String { + let mut chars = value.chars(); + let Some(first) = chars.next() else { + return value; + }; + first.to_uppercase().collect::() + chars.as_str() +} diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 8a308c43..e05724be 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -303,6 +303,26 @@ impl ForeignNetworkEntry { fn my_peer_id(&self) -> PeerId { self.my_peer_id } + + fn need_periodic_requery_peers(&self) -> bool { + true + } + + async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option { + let peer_map = self.peer_map.upgrade()?; + peer_map.get_peer_identity_type(peer_id) + } + + async fn get_peer_public_key(&self, peer_id: PeerId) -> Option> { + let peer_map = self.peer_map.upgrade()?; + peer_map.get_peer_public_key(peer_id) + } + + async fn close_peer(&self, peer_id: PeerId) { + if let Some(peer_map) = self.peer_map.upgrade() { + let _ = peer_map.close_peer(peer_id).await; + } + } } let route = PeerRoute::new( @@ -373,15 +393,15 @@ impl ForeignNetworkEntry { continue; } - if !peer_map.has_peer(from_peer_id) && relay_peer_map.is_secure_mode_enabled() { + if relay_peer_map.is_secure_mode_enabled() && hdr.is_encrypted() { match relay_peer_map.decrypt_if_needed(&mut zc_packet).await { Ok(true) => {} Ok(false) => { - tracing::error!("relay session not found"); + tracing::error!("secure session not found"); continue; } Err(e) => { - tracing::error!(?e, "relay decrypt failed"); + tracing::error!(?e, "secure decrypt failed"); continue; } } @@ -620,14 +640,27 @@ pub struct ForeignNetworkManager { } impl ForeignNetworkManager { - async fn is_shared_pubkey_trusted( + fn network_secret_digest_is_empty(network: &NetworkIdentity) -> bool { + network + .network_secret_digest + .as_ref() + .is_none_or(|d| d.iter().all(|b| *b == 0)) + } + + fn should_reject_credential_trust_path(identity_type: PeerIdentityType) -> bool { + matches!(identity_type, PeerIdentityType::Admin) + } + + async fn is_credential_pubkey_trusted( entry: &ForeignNetworkEntry, remote_static_pubkey: &[u8], ) -> bool { remote_static_pubkey.len() == 32 - && entry - .global_ctx - .is_pubkey_trusted(remote_static_pubkey, &entry.network.network_name) + && entry.global_ctx.is_pubkey_trusted_with_source( + remote_static_pubkey, + &entry.network.network_name, + TrustedKeySource::OspfCredential, + ) } fn build_trusted_key_items(entry: &ForeignNetworkEntry) -> Vec { @@ -697,6 +730,20 @@ impl ForeignNetworkManager { return ret; } + let peer_digest_empty = Self::network_secret_digest_is_empty(&peer_network); + if peer_digest_empty + && self + .data + .get_network_entry(&peer_network.network_name) + .is_none() + { + return Err(anyhow::anyhow!( + "foreign network {} is not established by a secret-verified peer yet", + peer_network.network_name + ) + .into()); + } + let (entry, new_added) = self .data .get_or_insert_entry( @@ -711,13 +758,17 @@ impl ForeignNetworkManager { .await; let same_identity = entry.network == peer_network; - let shared_peer = peer_conn.get_peer_identity_type() == PeerIdentityType::SharedNode; - let shared_peer_trusted = shared_peer - && Self::is_shared_pubkey_trusted(&entry, &conn_info.noise_remote_static_pubkey).await; + let peer_identity_type = peer_conn.get_peer_identity_type(); + let credential_peer_trusted = peer_digest_empty + && Self::is_credential_pubkey_trusted(&entry, &conn_info.noise_remote_static_pubkey) + .await; + let credential_identity_mismatch = credential_peer_trusted + && Self::should_reject_credential_trust_path(peer_identity_type); let _g = entry.lock.lock().await; - if (!(same_identity || shared_peer_trusted)) + if (!(same_identity || credential_peer_trusted)) + || credential_identity_mismatch || entry.my_peer_id != peer_conn.get_my_peer_id() { if new_added { @@ -730,13 +781,18 @@ impl ForeignNetworkManager { entry.my_peer_id, peer_conn.get_my_peer_id() ) + } else if credential_identity_mismatch { + anyhow::anyhow!( + "credential-trusted foreign peer has invalid identity type: {:?}", + peer_identity_type + ) } else { anyhow::anyhow!( - "foreign peer identity not trusted. exp: {:?} real: {:?}, remote_pubkey_len: {}, shared_trusted: {}", + "foreign peer identity not trusted. exp: {:?} real: {:?}, remote_pubkey_len: {}, credential_trusted: {}", entry.network, peer_network, conn_info.noise_remote_static_pubkey.len(), - shared_peer_trusted, + credential_peer_trusted, ) }; tracing::error!(?err, "foreign network entry not match, disconnect peer"); @@ -911,7 +967,7 @@ pub mod tests { set_global_var, tunnel::common::tests::wait_for_condition, }; - use std::time::Duration; + use std::{collections::HashMap, time::Duration}; use super::*; @@ -933,6 +989,20 @@ pub mod tests { peer_mgr } + async fn create_mock_credential_peer_manager_for_foreign_network( + network: &str, + ) -> Arc { + let (s, _r) = create_packet_recv_chan(); + let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new_credential( + network.to_string(), + ))); + set_secure_mode_cfg(&global_ctx, true); + let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, global_ctx, s)); + replace_stun_info_collector(peer_mgr.clone(), NatType::Unknown); + peer_mgr.run().await.unwrap(); + peer_mgr + } + pub async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { create_mock_peer_manager_for_foreign_network_ext(network, network).await } @@ -1027,6 +1097,86 @@ pub mod tests { .is_empty()); } + #[tokio::test] + async fn credential_pubkey_trust_requires_ospf_credential_source() { + let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new( + "__access__".to_string(), + "access_secret".to_string(), + ))); + let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string()); + let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan(); + let entry = ForeignNetworkEntry::new( + foreign_network.clone(), + 1, + global_ctx.clone(), + false, + Arc::new(PeerSessionStore::new()), + pm_packet_sender, + ); + let pubkey = vec![7; 32]; + + entry.global_ctx.update_trusted_keys( + HashMap::from([( + pubkey.clone(), + crate::common::global_ctx::TrustedKeyMetadata { + source: TrustedKeySource::OspfNode, + expiry_unix: None, + }, + )]), + &foreign_network.network_name, + ); + assert!(!ForeignNetworkManager::is_credential_pubkey_trusted(&entry, &pubkey).await); + + entry.global_ctx.update_trusted_keys( + HashMap::from([( + pubkey.clone(), + crate::common::global_ctx::TrustedKeyMetadata { + source: TrustedKeySource::OspfCredential, + expiry_unix: None, + }, + )]), + &foreign_network.network_name, + ); + assert!(ForeignNetworkManager::is_credential_pubkey_trusted(&entry, &pubkey).await); + } + + #[test] + fn credential_trust_path_rejects_admin_identity() { + assert!(ForeignNetworkManager::should_reject_credential_trust_path( + PeerIdentityType::Admin + )); + assert!(!ForeignNetworkManager::should_reject_credential_trust_path( + PeerIdentityType::Credential + )); + assert!(!ForeignNetworkManager::should_reject_credential_trust_path( + PeerIdentityType::SharedNode + )); + } + + #[tokio::test] + async fn zero_digest_peer_cannot_bootstrap_foreign_network() { + let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + set_secure_mode_cfg(&pm_center.get_global_ctx(), true); + + let pma_net1 = create_mock_credential_peer_manager_for_foreign_network("net1").await; + + let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair(); + let a_mgr_copy = pma_net1.clone(); + let client = tokio::spawn(async move { a_mgr_copy.add_client_tunnel(a_ring, false).await }); + let b_mgr_copy = pm_center.clone(); + let server = + tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await }); + + assert!(client.await.unwrap().is_ok()); + assert!(server.await.unwrap().is_err()); + assert!(pm_center + .get_foreign_network_manager() + .list_foreign_networks() + .await + .foreign_networks + .is_empty()); + } + async fn foreign_network_whitelist_helper(name: String) { let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; tracing::debug!("pm_center: {:?}", pm_center.my_peer_id()); diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 376f793b..fbc15eb6 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crossbeam::atomic::AtomicCell; use dashmap::{DashMap, DashSet}; +use parking_lot::RwLock; use tokio::{select, sync::mpsc}; @@ -42,6 +43,7 @@ pub struct Peer { default_conn_id: Arc>, peer_identity_type: Arc>>, + peer_public_key: Arc>>>, default_conn_id_clear_task: ScopedTask<()>, } @@ -56,6 +58,8 @@ impl Peer { let shutdown_notifier = Arc::new(tokio::sync::Notify::new()); let peer_identity_type = Arc::new(AtomicCell::new(None)); let peer_identity_type_copy = peer_identity_type.clone(); + let peer_public_key = Arc::new(RwLock::new(None)); + let peer_public_key_copy = peer_public_key.clone(); let conns_copy = conns.clone(); let shutdown_notifier_copy = shutdown_notifier.clone(); @@ -82,6 +86,7 @@ impl Peer { shrink_dashmap(&conns_copy, Some(4)); if conns_copy.is_empty() { peer_identity_type_copy.store(None); + *peer_public_key_copy.write() = None; } } } @@ -126,6 +131,7 @@ impl Peer { shutdown_notifier, default_conn_id, peer_identity_type, + peer_public_key, default_conn_id_clear_task, } } @@ -146,6 +152,22 @@ impl Peer { let close_notifier = conn.get_close_notifier(); let conn_info = conn.get_conn_info(); + let conn_pubkey = conn_info.noise_remote_static_pubkey.clone(); + { + let mut peer_pubkey = self.peer_public_key.write(); + if let Some(existing_pubkey) = peer_pubkey.as_ref() { + if existing_pubkey != &conn_pubkey { + return Err(Error::SecretKeyError(format!( + "peer public key mismatch. peer_id: {}, existing_len: {}, new_len: {}", + self.peer_node_id, + existing_pubkey.len(), + conn_pubkey.len() + ))); + } + } else { + *peer_pubkey = Some(conn_pubkey); + } + } conn.start_recv_loop(self.packet_recv_chan.clone()).await; conn.start_pingpong(); @@ -226,10 +248,14 @@ impl Peer { ret } + pub fn has_live_conns(&self) -> bool { + self.conns.iter().any(|entry| !entry.value().is_closed()) + } + pub fn has_directly_connected_conn(&self) -> bool { self.conns .iter() - .any(|entry| !(entry.value()).is_hole_punched()) + .any(|entry| !entry.value().is_closed() && !entry.value().is_hole_punched()) } pub fn get_directly_connections(&self) -> DashSet { @@ -247,6 +273,10 @@ impl Peer { pub fn get_peer_identity_type(&self) -> Option { self.peer_identity_type.load() } + + pub fn get_peer_public_key(&self) -> Option> { + self.peer_public_key.read().clone() + } } // pritn on drop @@ -458,4 +488,82 @@ mod tests { let ret = peer.add_peer_conn(admin_client_conn).await; assert!(ret.is_err()); } + + #[tokio::test] + async fn reject_peer_conn_with_mismatched_public_key() { + let (packet_send, _packet_recv) = create_packet_recv_chan(); + let local_peer_id = new_peer_id(); + let remote_peer_id = new_peer_id(); + let peer = Peer::new(remote_peer_id, packet_send, get_mock_global_ctx()); + let ps = Arc::new(PeerSessionStore::new()); + + let (client_tunnel_1, server_tunnel_1) = create_ring_tunnel_pair(); + let client_ctx_1 = get_mock_global_ctx(); + let server_ctx_1 = get_mock_global_ctx(); + client_ctx_1 + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + server_ctx_1 + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + set_secure_mode_cfg(&client_ctx_1, true); + set_secure_mode_cfg(&server_ctx_1, true); + let mut client_conn_1 = PeerConn::new( + local_peer_id, + client_ctx_1, + Box::new(client_tunnel_1), + ps.clone(), + ); + let mut server_conn_1 = PeerConn::new( + remote_peer_id, + server_ctx_1, + Box::new(server_tunnel_1), + ps.clone(), + ); + let (c1, s1) = tokio::join!( + client_conn_1.do_handshake_as_client(), + server_conn_1.do_handshake_as_server() + ); + c1.unwrap(); + s1.unwrap(); + + let (client_tunnel_2, server_tunnel_2) = create_ring_tunnel_pair(); + let client_ctx_2 = get_mock_global_ctx(); + let server_ctx_2 = get_mock_global_ctx(); + client_ctx_2 + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + server_ctx_2 + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + set_secure_mode_cfg(&client_ctx_2, true); + set_secure_mode_cfg(&server_ctx_2, true); + let mut client_conn_2 = PeerConn::new( + local_peer_id, + client_ctx_2, + Box::new(client_tunnel_2), + Arc::new(PeerSessionStore::new()), + ); + let mut server_conn_2 = PeerConn::new( + remote_peer_id, + server_ctx_2, + Box::new(server_tunnel_2), + Arc::new(PeerSessionStore::new()), + ); + let (c2, s2) = tokio::join!( + client_conn_2.do_handshake_as_client(), + server_conn_2.do_handshake_as_server() + ); + c2.unwrap(); + s2.unwrap(); + + let pubkey_1 = client_conn_1.get_conn_info().noise_remote_static_pubkey; + let pubkey_2 = client_conn_2.get_conn_info().noise_remote_static_pubkey; + assert_ne!(pubkey_1, pubkey_2); + + peer.add_peer_conn(client_conn_1).await.unwrap(); + assert_eq!(peer.get_peer_public_key(), Some(pubkey_1)); + let ret = peer.add_peer_conn(client_conn_2).await; + assert!(ret.is_err()); + } } diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index b2327c72..36725fd1 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -455,6 +455,10 @@ impl PeerConn { self.is_hole_punched } + pub fn is_closed(&self) -> bool { + self.close_event_notifier.is_closed() + } + async fn wait_handshake(&self, need_retry: &mut bool) -> Result { *need_retry = false; @@ -687,9 +691,9 @@ impl PeerConn { /// | Admin | Admin | same network_secret, proof verified | NetworkSecretConfirmed | NetworkSecretConfirmed | Admin | Admin | /// | Credential | Admin | client pubkey is trusted by admin | EncryptedUnauthenticated | PeerVerified | Admin | Credential | /// | Credential | Admin | client pubkey is unknown | handshake may fail | handshake reject | unknown | unknown | - /// | Admin | SharedNode | pinned key match | PeerVerified | EncryptedUnauthenticated | SharedNode | SharedNode | - /// | Admin | SharedNode | local has no pinned key requirement | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode | - /// | Credential | SharedNode | no pin and not trusted | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode | + /// | Admin | SharedNode | pinned key match | PeerVerified | EncryptedUnauthenticated | SharedNode | Admin | + /// | Admin | SharedNode | local has no pinned key requirement | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | Admin | + /// | Credential | SharedNode | no pin and not trusted | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | Credential | /// | Credential | Credential | should reject | handshake reject | handshake reject | unknown | unknown | /// /// Logic (in priority order): @@ -764,20 +768,27 @@ impl PeerConn { secure_auth_level: SecureAuthLevel, remote_role_hint_is_same_network: bool, remote_sent_secret_proof: bool, + is_client: bool, ) -> PeerIdentityType { if !remote_role_hint_is_same_network || remote_network_name != self.global_ctx.get_network_name() { - return PeerIdentityType::SharedNode; - } + if is_client { + PeerIdentityType::SharedNode + } else if remote_sent_secret_proof { + PeerIdentityType::Admin + } else { + PeerIdentityType::Credential + } + } else { + if matches!(secure_auth_level, SecureAuthLevel::NetworkSecretConfirmed) + || remote_sent_secret_proof + { + return PeerIdentityType::Admin; + } - if matches!(secure_auth_level, SecureAuthLevel::NetworkSecretConfirmed) - || remote_sent_secret_proof - { - return PeerIdentityType::Admin; + PeerIdentityType::Credential } - - PeerIdentityType::Credential } async fn do_noise_handshake_as_client(&self) -> Result { @@ -920,6 +931,7 @@ impl PeerConn { secure_auth_level, msg2_pb.role_hint == 1, remote_sent_secret_proof, + true, ); let handshake_hash = hs.get_handshake_hash().to_vec(); @@ -1174,6 +1186,7 @@ impl PeerConn { secure_auth_level, role_hint == 1, msg3_pb.secret_proof_32.is_some(), + false, ); let handshake_hash = hs.get_handshake_hash().to_vec(); @@ -1948,7 +1961,7 @@ pub mod tests { ); assert_eq!( s_peer.get_conn_info().peer_identity_type, - PeerIdentityType::SharedNode as i32, + PeerIdentityType::Admin as i32, ); } @@ -1999,7 +2012,7 @@ pub mod tests { ); assert_eq!( s_peer.get_conn_info().peer_identity_type, - PeerIdentityType::SharedNode as i32, + PeerIdentityType::Admin as i32, ); } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 687f85d4..28fe540a 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -819,17 +819,15 @@ impl PeerManager { tracing::error!(?e, "decrypt failed"); continue; } - } else if !peers.has_peer(from_peer_id) - && !foreign_client.has_next_hop(from_peer_id) - { + } else if hdr.is_encrypted() { match relay_peer_map.decrypt_if_needed(&mut ret).await { Ok(true) => {} Ok(false) => { - tracing::error!("relay session not found"); + tracing::error!("secure session not found"); continue; } Err(e) => { - tracing::error!(?e, "relay decrypt failed"); + tracing::error!(?e, "secure decrypt failed"); continue; } } @@ -904,6 +902,16 @@ impl PeerManager { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { let hdr = packet.peer_manager_header().unwrap(); if hdr.packet_type == PacketType::Data as u8 && !hdr.is_not_send_to_tun() { + if hdr.is_encrypted() || hdr.is_compressed() { + tracing::warn!( + from_peer_id = hdr.from_peer_id.get(), + to_peer_id = hdr.to_peer_id.get(), + encrypted = hdr.is_encrypted(), + compressed = hdr.is_compressed(), + "dropping packet before nic because it is not fully decoded" + ); + return None; + } tracing::trace!(?packet, "send packet to nic channel"); // TODO: use a function to get the body ref directly for zero copy let _ = self.nic_channel.send(packet).await; @@ -989,6 +997,11 @@ impl PeerManager { } } + async fn get_peer_public_key(&self, peer_id: PeerId) -> Option> { + let peer_map = self.peers.upgrade()?; + peer_map.get_peer_public_key(peer_id) + } + async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option { let peer_map = self.peers.upgrade()?; peer_map.get_peer_identity_type(peer_id) diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 5b055a81..6de1103e 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -278,13 +278,9 @@ impl PeerMap { pub async fn list_peers_with_conn(&self) -> Vec { let mut ret = Vec::new(); - let peers = self.list_peers(); - for peer_id in peers.iter() { - let Some(peer) = self.get_peer_by_id(*peer_id) else { - continue; - }; - if !peer.list_peer_conns().await.is_empty() { - ret.push(*peer_id); + for item in self.peer_map.iter() { + if item.value().has_live_conns() { + ret.push(*item.key()); } } ret @@ -308,6 +304,11 @@ impl PeerMap { .and_then(|p| p.get_peer_identity_type()) } + pub fn get_peer_public_key(&self, peer_id: PeerId) -> Option> { + self.get_peer_by_id(peer_id) + .and_then(|p| p.get_peer_public_key()) + } + pub async fn close_peer_conn( &self, peer_id: PeerId, diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 386c00ab..d104dfd1 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -32,8 +32,12 @@ use tokio::{ use crate::{ common::{ - config::NetworkIdentity, constants::EASYTIER_VERSION, global_ctx::ArcGlobalCtx, - shrink_dashmap, stun::StunInfoCollectorTrait, PeerId, + config::NetworkIdentity, + constants::EASYTIER_VERSION, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, + shrink_dashmap, + stun::StunInfoCollectorTrait, + PeerId, }, peers::route_trait::{Route, RouteInterfaceBox}, proto::{ @@ -66,6 +70,8 @@ use super::{ PeerPacketFilter, }; +use atomic_shim::AtomicU64; + static SERVICE_ID: u32 = 7; static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600); static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660); @@ -371,6 +377,13 @@ impl Default for RouteConnInfo { } } +#[derive(Debug, Default)] +struct InterfacePeerSnapshot { + generation: u64, + peers: BTreeSet, + identity_types: BTreeMap>, +} + // constructed with all infos synced from all peers. struct SyncedRouteInfo { peer_infos: RwLock>, @@ -1049,14 +1062,12 @@ impl SyncedRouteInfo { .unwrap_or(false) } - fn get_credential_info(&self, peer_id: PeerId) -> Option { - let peer_infos = self.peer_infos.read(); - let info = peer_infos.get(&peer_id)?; - if info.noise_static_pubkey.is_empty() { + fn get_credential_info_by_pubkey(&self, peer_pubkey: &[u8]) -> Option { + if peer_pubkey.is_empty() { return None; } self.trusted_credential_pubkeys - .get(&info.noise_static_pubkey) + .get(peer_pubkey) .map(|r| r.value().clone()) } } @@ -1800,6 +1811,9 @@ struct PeerRouteServiceImpl { synced_route_info: SyncedRouteInfo, cached_local_conn_map: std::sync::Mutex, cached_local_conn_map_version: AtomicVersion, + cached_interface_peer_snapshot: std::sync::Mutex>, + interface_peers_generation: AtomicU64, + applied_interface_peers_generation: AtomicU64, last_update_my_foreign_network: AtomicCell>, @@ -1858,6 +1872,11 @@ impl PeerRouteServiceImpl { }, cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()), cached_local_conn_map_version: AtomicVersion::new(), + cached_interface_peer_snapshot: std::sync::Mutex::new(Arc::new( + InterfacePeerSnapshot::default(), + )), + interface_peers_generation: AtomicU64::new(1), + applied_interface_peers_generation: AtomicU64::new(0), last_update_my_foreign_network: AtomicCell::new(None), @@ -1904,15 +1923,60 @@ impl PeerRouteServiceImpl { self.sessions.iter().map(|x| *x.key()).collect() } + pub fn mark_interface_peers_dirty(&self) { + self.interface_peers_generation + .fetch_add(1, Ordering::Relaxed); + } + + async fn interface_peer_snapshot_uncached(&self) -> InterfacePeerSnapshot { + let interface = self.interface.lock().await; + let interface = interface.as_ref().unwrap(); + + let peers: BTreeSet<_> = interface.list_peers().await.into_iter().collect(); + let mut identity_types = BTreeMap::new(); + for peer_id in peers.iter().copied() { + identity_types.insert(peer_id, interface.get_peer_identity_type(peer_id).await); + } + + InterfacePeerSnapshot { + generation: 0, + peers, + identity_types, + } + } + + async fn interface_peer_snapshot(&self) -> Arc { + loop { + let start_generation = self.interface_peers_generation.load(Ordering::Acquire); + { + let cached = self.cached_interface_peer_snapshot.lock().unwrap(); + if cached.generation == start_generation { + return cached.clone(); + } + } + + let mut snapshot = self.interface_peer_snapshot_uncached().await; + let end_generation = self.interface_peers_generation.load(Ordering::Acquire); + if start_generation == end_generation { + snapshot.generation = end_generation; + let snapshot = Arc::new(snapshot); + *self.cached_interface_peer_snapshot.lock().unwrap() = snapshot.clone(); + return snapshot; + } + } + } + + async fn list_peers_from_interface_snapshot(&self) -> (u64, BTreeSet) { + let snapshot = self.interface_peer_snapshot().await; + (snapshot.generation, snapshot.peers.clone()) + } + async fn list_peers_from_interface>(&self) -> T { - self.interface - .lock() + self.interface_peer_snapshot() .await - .as_ref() - .unwrap() - .list_peers() - .await - .into_iter() + .peers + .iter() + .copied() .collect() } @@ -1920,6 +1984,11 @@ impl PeerRouteServiceImpl { &self, peer_id: PeerId, ) -> Option { + let snapshot = self.interface_peer_snapshot().await; + if let Some(identity_type) = snapshot.identity_types.get(&peer_id) { + return *identity_type; + } + self.interface .lock() .await @@ -1929,6 +1998,16 @@ impl PeerRouteServiceImpl { .await } + async fn get_peer_public_key_from_interface(&self, peer_id: PeerId) -> Option> { + self.interface + .lock() + .await + .as_ref() + .unwrap() + .get_peer_public_key(peer_id) + .await + } + fn update_my_peer_info(&self) -> bool { self.synced_route_info.update_my_peer_info( self.my_peer_id, @@ -1938,9 +2017,33 @@ impl PeerRouteServiceImpl { } async fn update_my_conn_info(&self) -> bool { - let connected_peers: BTreeSet = self.list_peers_from_interface().await; - self.synced_route_info - .update_my_conn_info(self.my_peer_id, connected_peers) + let current_generation = self.interface_peers_generation.load(Ordering::Acquire); + let generation_applied = self + .applied_interface_peers_generation + .load(Ordering::Acquire) + == current_generation; + if generation_applied { + let need_periodic_requery = self + .interface + .lock() + .await + .as_ref() + .map(|x| x.need_periodic_requery_peers()) + .unwrap_or(false); + if !need_periodic_requery { + return false; + } + + self.mark_interface_peers_dirty(); + } + + let (generation, connected_peers) = self.list_peers_from_interface_snapshot().await; + let updated = self + .synced_route_info + .update_my_conn_info(self.my_peer_id, connected_peers); + self.applied_interface_peers_generation + .store(generation, Ordering::Release); + updated } async fn update_my_foreign_network(&self) -> bool { @@ -2048,6 +2151,18 @@ impl PeerRouteServiceImpl { .unwrap_or(false) } + fn handle_global_ctx_event(&self, event: &GlobalCtxEvent) { + if matches!( + event, + GlobalCtxEvent::PeerAdded(_) + | GlobalCtxEvent::PeerRemoved(_) + | GlobalCtxEvent::PeerConnAdded(_) + | GlobalCtxEvent::PeerConnRemoved(_) + ) { + self.mark_interface_peers_dirty(); + } + } + fn update_route_table_and_cached_local_conn_bitmap(&self) { self.update_peer_info_last_update(); @@ -2283,15 +2398,7 @@ impl PeerRouteServiceImpl { let my_foreign_network_updated = self.update_my_foreign_network().await; let mut untrusted_changed = false; if my_peer_info_updated { - let network_identity = self.global_ctx.get_network_identity(); - let network_secret = network_identity.network_secret.as_deref(); - let (untrusted, global_trusted_keys) = self - .synced_route_info - .verify_and_update_credential_trusts(network_secret); - self.global_ctx - .update_trusted_keys(global_trusted_keys, &network_identity.network_name); - self.disconnect_untrusted_peers(&untrusted).await; - untrusted_changed = !untrusted.is_empty(); + untrusted_changed = self.refresh_credential_trusts_and_disconnect().await; } if my_peer_info_updated || my_conn_info_updated || untrusted_changed { @@ -2304,6 +2411,18 @@ impl PeerRouteServiceImpl { my_peer_info_updated || my_conn_info_updated || my_foreign_network_updated } + async fn refresh_credential_trusts_and_disconnect(&self) -> bool { + let network_identity = self.global_ctx.get_network_identity(); + let network_secret = network_identity.network_secret.as_deref(); + let (untrusted, global_trusted_keys) = self + .synced_route_info + .verify_and_update_credential_trusts(network_secret); + self.global_ctx + .update_trusted_keys(global_trusted_keys, &network_identity.network_name); + self.disconnect_untrusted_peers(&untrusted).await; + !untrusted.is_empty() + } + async fn disconnect_untrusted_peers(&self, untrusted_peers: &[PeerId]) { if untrusted_peers.is_empty() { return; @@ -2364,7 +2483,7 @@ impl PeerRouteServiceImpl { } } - fn clear_expired_peer(&self) { + async fn clear_expired_peer(&self) { let now = SystemTime::now(); let mut to_remove = Vec::new(); for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() { @@ -2404,6 +2523,7 @@ impl PeerRouteServiceImpl { self.synced_route_info.foreign_network.remove(p); } + self.refresh_credential_trusts_and_disconnect().await; self.route_table.clean_expired_route_info(); self.route_table_with_cost.clean_expired_route_info(); } @@ -2789,10 +2909,9 @@ impl RouteSessionManager { _ = recv.recv() => {} } - let mut peers = service_impl.list_peers_from_interface::>().await; - peers.sort(); - - let session_peers = self.list_session_peers(); + let interface_snapshot = service_impl.interface_peer_snapshot().await; + let peers = &interface_snapshot.peers; + let session_peers = self.list_session_peer_set(); for peer_id in session_peers.iter() { if !peers.contains(peer_id) { if Some(*peer_id) == cur_dst_peer_id_to_initiate { @@ -2808,9 +2927,11 @@ impl RouteSessionManager { // Step 9a: Filter OSPF session candidates based on direct auth level. // - Credential nodes only initiate sessions to admin nodes (not other credential nodes) // - Admin nodes don't initiate sessions to credential nodes - let identity_type = service_impl - .get_peer_identity_type_from_interface(peer_id) - .await + let identity_type = interface_snapshot + .identity_types + .get(&peer_id) + .copied() + .flatten() .unwrap_or(PeerIdentityType::Admin); if matches!(identity_type, PeerIdentityType::Credential) { continue; @@ -2896,6 +3017,14 @@ impl RouteSessionManager { service_impl.list_session_peers() } + fn list_session_peer_set(&self) -> BTreeSet { + let Some(service_impl) = self.service_impl.upgrade() else { + return BTreeSet::new(); + }; + + service_impl.list_session_peers().into_iter().collect() + } + fn dump_sessions(&self) -> Result { let Some(service_impl) = self.service_impl.upgrade() else { return Err(Error::Stopped); @@ -2919,6 +3048,33 @@ impl RouteSessionManager { tracing::debug!(?ret, ?reason, "sync_now_broadcast.send"); } + fn extract_credential_peer_info( + &self, + from_peer_id: PeerId, + peer_infos: &[RoutePeerInfo], + raw_peer_infos: &[DynamicMessage], + credential: &TrustedCredentialPubkey, + ) -> Option<(RoutePeerInfo, DynamicMessage)> { + let info_idx = peer_infos.iter().position(|p| p.peer_id == from_peer_id)?; + let mut info = peer_infos[info_idx].clone(); + let mut raw_info = raw_peer_infos[info_idx].clone(); + let allowed_cidrs = &credential.allowed_proxy_cidrs; + // Filter proxy_cidrs to only those allowed by credential + if !allowed_cidrs.is_empty() { + info.proxy_cidrs.retain(|cidr| { + allowed_cidrs + .iter() + .any(|allowed| cidr_is_subset_str(cidr, allowed)) + }); + } else { + // No allowed_proxy_cidrs → no proxy_cidrs allowed + info.proxy_cidrs.clear(); + } + SyncedRouteInfo::mark_credential_peer(&mut info, true); + patch_raw_from_info(&mut raw_info, &info, &["proxy_cidrs", "feature_flag"]); + Some((info, raw_info)) + } + #[allow(clippy::too_many_arguments)] async fn do_sync_route_info( &self, @@ -2942,7 +3098,22 @@ impl RouteSessionManager { .await .unwrap_or(PeerIdentityType::Admin); let from_is_credential = matches!(from_identity_type, PeerIdentityType::Credential); - let from_is_shared = matches!(from_identity_type, PeerIdentityType::SharedNode); + let credential_info = if from_is_credential { + service_impl + .get_peer_public_key_from_interface(from_peer_id) + .await + .and_then(|pubkey| { + service_impl + .synced_route_info + .get_credential_info_by_pubkey(&pubkey) + }) + } else { + None + }; + if from_is_credential && credential_info.is_none() { + // no credential found + return Err(Error::Stopped); + } let _session_lock = session.lock.lock(); @@ -2955,108 +3126,44 @@ impl RouteSessionManager { if let Some(peer_infos) = &peer_infos { // Step 9b: credential peers can only propagate their own route info - let normalize_raw = |info: &RoutePeerInfo| { - let mut raw = DynamicMessage::new(RoutePeerInfo::default().descriptor()); - raw.transcode_from(info).unwrap(); - raw - }; - let normalized_peer_infos: Vec; - let normalized_raw_peer_infos: Vec; + // patch_raw_from_info(&mut raw, info, &["proxy_cidrs", "feature_flag"]); let (pi, rpi) = if from_is_credential { - let allowed_cidrs = service_impl - .synced_route_info - .get_credential_info(from_peer_id) - .map(|tc| tc.allowed_proxy_cidrs.clone()) - .unwrap_or_default(); - normalized_peer_infos = peer_infos - .iter() - .filter(|info| info.peer_id == from_peer_id) - .cloned() - .map(|mut info| { - // Filter proxy_cidrs to only those allowed by credential - if !allowed_cidrs.is_empty() { - info.proxy_cidrs.retain(|cidr| { - allowed_cidrs - .iter() - .any(|allowed| cidr_is_subset_str(cidr, allowed)) - }); - } else { - // No allowed_proxy_cidrs → no proxy_cidrs allowed - info.proxy_cidrs.clear(); - } - SyncedRouteInfo::mark_credential_peer(&mut info, true); - info - }) - .collect(); - normalized_raw_peer_infos = normalized_peer_infos - .iter() - .map(|info| { - // Find original raw for this peer to preserve unknown fields - let orig_idx = peer_infos.iter().position(|p| p.peer_id == info.peer_id); - let mut raw = orig_idx - .and_then(|idx| raw_peer_infos.as_ref().map(|rpi| rpi[idx].clone())) - .unwrap_or_else(|| normalize_raw(info)); - patch_raw_from_info(&mut raw, info, &["proxy_cidrs", "feature_flag"]); - raw - }) - .collect(); - (&normalized_peer_infos, &normalized_raw_peer_infos) + if let Some(ret) = self.extract_credential_peer_info( + from_peer_id, + peer_infos, + raw_peer_infos.as_deref().unwrap(), + credential_info.as_ref().unwrap(), + ) { + (&vec![ret.0], &vec![ret.1]) + } else { + (&vec![], &vec![]) + } } else { - let mut peer_infos_mut = peer_infos.clone(); - let mut raw_peer_infos_mut = raw_peer_infos - .as_ref() - .cloned() - .unwrap_or_else(|| peer_infos_mut.iter().map(normalize_raw).collect()); - if from_is_shared { - for (info, raw) in peer_infos_mut.iter_mut().zip(raw_peer_infos_mut.iter_mut()) - { - info.trusted_credential_pubkeys.clear(); - patch_raw_from_info(raw, info, &["trusted_credential_pubkeys"]); - } - } - if let Some((idx, info)) = peer_infos_mut - .iter() - .enumerate() - .find(|(_, info)| info.peer_id == from_peer_id) - { - let mut info = info.clone(); - SyncedRouteInfo::mark_credential_peer(&mut info, false); - peer_infos_mut[idx] = info.clone(); - patch_raw_from_info(&mut raw_peer_infos_mut[idx], &info, &["feature_flag"]); - } - normalized_peer_infos = peer_infos_mut; - normalized_raw_peer_infos = raw_peer_infos_mut; - (&normalized_peer_infos, &normalized_raw_peer_infos) + (peer_infos, raw_peer_infos.as_ref().unwrap()) }; - - service_impl.synced_route_info.update_peer_infos( - my_peer_id, - service_impl.my_peer_route_id, - from_peer_id, - pi, - rpi, - )?; - service_impl - .synced_route_info - .verify_and_update_group_trusts( + if !pi.is_empty() { + service_impl.synced_route_info.update_peer_infos( + my_peer_id, + service_impl.my_peer_route_id, + from_peer_id, pi, - &service_impl.global_ctx.get_acl_group_declarations(), - ); - session.update_dst_saved_peer_info_version(pi, from_peer_id); - need_update_route_table = true; + rpi, + )?; + service_impl + .synced_route_info + .verify_and_update_group_trusts( + pi, + &service_impl.global_ctx.get_acl_group_declarations(), + ); + session.update_dst_saved_peer_info_version(pi, from_peer_id); + need_update_route_table = true; + } } // Step 9b: credential peers' conn_info depends on allow_relay flag if let Some(conn_info) = &conn_info { - let accept_conn_info = if from_is_credential { - service_impl - .synced_route_info - .get_credential_info(from_peer_id) - .map(|tc| tc.allow_relay) - .unwrap_or(false) - } else { - true - }; + let accept_conn_info = + !from_is_credential || credential_info.map(|tc| tc.allow_relay).unwrap_or(false); if accept_conn_info { service_impl.synced_route_info.update_conn_info(conn_info); session.update_dst_saved_conn_info_version(conn_info, from_peer_id); @@ -3162,7 +3269,7 @@ impl PeerRoute { async fn clear_expired_peer(service_impl: Arc) { loop { tokio::time::sleep(Duration::from_secs(60)).await; - service_impl.clear_expired_peer(); + service_impl.clear_expired_peer().await; // TODO: use debug log level for this. tracing::debug!(?service_impl, "clear_expired_peer"); } @@ -3180,6 +3287,7 @@ impl PeerRoute { session_mgr: RouteSessionManager, ) { let mut global_event_receiver = service_impl.global_ctx.subscribe(); + service_impl.mark_interface_peers_dirty(); loop { if service_impl.update_my_infos().await { session_mgr.sync_now("update_my_infos"); @@ -3193,6 +3301,12 @@ impl PeerRoute { select! { ev = global_event_receiver.recv() => { + if let Ok(ev_ref) = &ev { + service_impl.handle_global_ctx_event(ev_ref); + } else { + service_impl.mark_interface_peers_dirty(); + global_event_receiver = global_event_receiver.resubscribe(); + } tracing::info!(?ev, "global event received in update_my_peer_info_routine"); } _ = tokio::time::sleep(Duration::from_secs(1)) => {} @@ -3448,18 +3562,25 @@ impl PeerPacketFilter for Arc {} #[cfg(test)] mod tests { use std::{ - collections::BTreeSet, - sync::{atomic::Ordering, Arc}, - time::Duration, + collections::{BTreeSet, HashMap}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, }; use cidr::{Ipv4Cidr, Ipv4Inet, Ipv6Inet}; use dashmap::DashMap; + use parking_lot::Mutex; use prefix_trie::PrefixMap; use prost_reflect::{DynamicMessage, ReflectMessage}; use crate::{ - common::{global_ctx::tests::get_mock_global_ctx, PeerId}, + common::{ + global_ctx::{tests::get_mock_global_ctx, GlobalCtxEvent, TrustedKeySource}, + PeerId, + }, connector::udp_hole_punch::tests::replace_stun_info_collector, peers::{ create_packet_recv_chan, @@ -3479,11 +3600,12 @@ mod tests { }; use prost::Message; - use super::PeerRoute; + use super::{PeerRoute, REMOVE_DEAD_PEER_INFO_AFTER}; struct AuthOnlyInterface { my_peer_id: PeerId, identity_type: DashMap, + peer_public_key: DashMap>, } #[async_trait::async_trait] @@ -3496,11 +3618,183 @@ mod tests { self.my_peer_id } + async fn get_peer_public_key(&self, peer_id: PeerId) -> Option> { + self.peer_public_key + .get(&peer_id) + .map(|x| x.value().clone()) + } + async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option { self.identity_type.get(&peer_id).map(|x| *x.value()) } } + struct TrackingInterface { + my_peer_id: PeerId, + closed_peers: Arc>>, + } + + #[async_trait::async_trait] + impl RouteInterface for TrackingInterface { + async fn list_peers(&self) -> Vec { + Vec::new() + } + + fn my_peer_id(&self) -> PeerId { + self.my_peer_id + } + + async fn close_peer(&self, peer_id: PeerId) { + self.closed_peers.lock().push(peer_id); + } + } + + struct CountingInterface { + my_peer_id: PeerId, + peers: Arc>>, + peer_identity_types: Arc>>>, + list_peers_calls: Arc, + get_peer_identity_type_calls: Arc, + } + + #[async_trait::async_trait] + impl RouteInterface for CountingInterface { + async fn list_peers(&self) -> Vec { + self.list_peers_calls.fetch_add(1, Ordering::Relaxed); + self.peers.lock().clone() + } + + async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option { + self.get_peer_identity_type_calls + .fetch_add(1, Ordering::Relaxed); + self.peer_identity_types + .lock() + .get(&peer_id) + .copied() + .flatten() + } + + fn my_peer_id(&self) -> PeerId { + self.my_peer_id + } + } + + #[tokio::test] + async fn interface_peer_cache_refreshes_only_when_marked_dirty() { + let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx()); + let peers = Arc::new(Mutex::new(vec![2, 3])); + let peer_identity_types = Arc::new(Mutex::new(HashMap::new())); + let list_peers_calls = Arc::new(AtomicU32::new(0)); + let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0)); + *service_impl.interface.lock().await = Some(Box::new(CountingInterface { + my_peer_id: 1, + peers: peers.clone(), + peer_identity_types, + list_peers_calls: list_peers_calls.clone(), + get_peer_identity_type_calls, + })); + + let first: BTreeSet<_> = service_impl.list_peers_from_interface().await; + let second: BTreeSet<_> = service_impl.list_peers_from_interface().await; + + assert_eq!(first, BTreeSet::from([2, 3])); + assert_eq!(second, BTreeSet::from([2, 3])); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1); + + *peers.lock() = vec![2, 4]; + service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnAdded(Default::default())); + + let third: BTreeSet<_> = service_impl.list_peers_from_interface().await; + assert_eq!(third, BTreeSet::from([2, 4])); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2); + } + + #[tokio::test] + async fn update_my_conn_info_skips_interface_scan_when_topology_is_unchanged() { + let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx()); + let peers = Arc::new(Mutex::new(vec![2, 3])); + let peer_identity_types = Arc::new(Mutex::new(HashMap::new())); + let list_peers_calls = Arc::new(AtomicU32::new(0)); + let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0)); + *service_impl.interface.lock().await = Some(Box::new(CountingInterface { + my_peer_id: 1, + peers: peers.clone(), + peer_identity_types, + list_peers_calls: list_peers_calls.clone(), + get_peer_identity_type_calls: get_peer_identity_type_calls.clone(), + })); + + assert!(service_impl.update_my_conn_info().await); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2); + + assert!(!service_impl.update_my_conn_info().await); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2); + + *peers.lock() = vec![2, 4]; + service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnRemoved(Default::default())); + + assert!(service_impl.update_my_conn_info().await); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4); + + assert!(!service_impl.update_my_conn_info().await); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4); + } + + #[tokio::test] + async fn get_peer_identity_type_reuses_snapshot_until_topology_changes() { + let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx()); + let peers = Arc::new(Mutex::new(vec![2, 3])); + let peer_identity_types = Arc::new(Mutex::new(HashMap::from([ + (2, Some(PeerIdentityType::Credential)), + (3, Some(PeerIdentityType::Admin)), + (4, Some(PeerIdentityType::Admin)), + ]))); + let list_peers_calls = Arc::new(AtomicU32::new(0)); + let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0)); + *service_impl.interface.lock().await = Some(Box::new(CountingInterface { + my_peer_id: 1, + peers: peers.clone(), + peer_identity_types: peer_identity_types.clone(), + list_peers_calls: list_peers_calls.clone(), + get_peer_identity_type_calls: get_peer_identity_type_calls.clone(), + })); + + assert_eq!( + service_impl.get_peer_identity_type_from_interface(2).await, + Some(PeerIdentityType::Credential) + ); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2); + + assert_eq!( + service_impl.get_peer_identity_type_from_interface(2).await, + Some(PeerIdentityType::Credential) + ); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2); + + *peers.lock() = vec![2, 4]; + service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnRemoved(Default::default())); + + assert_eq!( + service_impl.get_peer_identity_type_from_interface(4).await, + Some(PeerIdentityType::Admin) + ); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4); + + assert_eq!( + service_impl.get_peer_identity_type_from_interface(4).await, + Some(PeerIdentityType::Admin) + ); + assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2); + assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4); + } + async fn create_mock_route(peer_mgr: Arc) -> Arc { let peer_route = PeerRoute::new( peer_mgr.my_peer_id(), @@ -3654,13 +3948,29 @@ mod tests { let route = create_mock_route(peer_mgr.clone()).await; let from_peer_id: PeerId = 10001; let forwarded_peer_id: PeerId = 10002; + let credential_pubkey = vec![3u8; 32]; let identity_type = DashMap::new(); identity_type.insert(from_peer_id, PeerIdentityType::Credential); + let peer_public_key = DashMap::new(); + peer_public_key.insert(from_peer_id, credential_pubkey.clone()); *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { my_peer_id: peer_mgr.my_peer_id(), identity_type, + peer_public_key, })); + route + .service_impl + .synced_route_info + .trusted_credential_pubkeys + .insert( + credential_pubkey.clone(), + TrustedCredentialPubkey { + pubkey: credential_pubkey, + expiry_unix: i64::MAX, + ..Default::default() + }, + ); let mut sender_info = RoutePeerInfo::new(); sender_info.peer_id = from_peer_id; @@ -3703,6 +4013,7 @@ mod tests { assert!(guard.get(&forwarded_peer_id).is_none()); } + // shared node doesn't have hmac. #[tokio::test] async fn sync_route_info_shared_sender_cannot_publish_trusted_credentials() { let peer_mgr = create_mock_pmgr().await; @@ -3716,6 +4027,7 @@ mod tests { *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { my_peer_id: peer_mgr.my_peer_id(), identity_type, + peer_public_key: DashMap::new(), })); let mut sender_info = RoutePeerInfo::new(); @@ -3755,13 +4067,6 @@ mod tests { .await .unwrap(); - let guard = route.service_impl.synced_route_info.peer_infos.read(); - assert!(guard - .get(&forwarded_peer_id) - .map(|x| x.trusted_credential_pubkeys.is_empty()) - .unwrap_or(false)); - drop(guard); - assert!(!route .service_impl .synced_route_info @@ -3770,60 +4075,83 @@ mod tests { } #[tokio::test] - async fn sync_route_info_forces_non_credential_for_legacy_admin_sender() { - let peer_mgr = create_mock_pmgr().await; - let route = create_mock_route(peer_mgr.clone()).await; - let from_peer_id: PeerId = 10011; - let other_peer_id: PeerId = 10012; + async fn clear_expired_peer_recomputes_trust_after_last_admin_disappears() { + let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx()); + let admin_peer_id: PeerId = 10051; + let credential_peer_id: PeerId = 10052; + let admin_pubkey = vec![5u8; 32]; + let credential_pubkey = vec![6u8; 32]; + let network_name = service_impl + .global_ctx + .get_network_identity() + .network_name + .clone(); + let now = SystemTime::now(); + let closed_peers = Arc::new(Mutex::new(Vec::new())); - let identity_type = DashMap::new(); - identity_type.insert(from_peer_id, PeerIdentityType::Admin); - *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { - my_peer_id: peer_mgr.my_peer_id(), - identity_type, + *service_impl.interface.lock().await = Some(Box::new(TrackingInterface { + my_peer_id: service_impl.my_peer_id, + closed_peers: closed_peers.clone(), })); - let mut sender_info = RoutePeerInfo::new(); - sender_info.peer_id = from_peer_id; - sender_info.version = 1; - sender_info.feature_flag = Some(PeerFeatureFlag { - is_credential_peer: true, - ..Default::default() - }); + { + let mut guard = service_impl.synced_route_info.peer_infos.write(); - let mut other_info = RoutePeerInfo::new(); - other_info.peer_id = other_peer_id; - other_info.version = 1; + let mut admin_info = RoutePeerInfo::new(); + admin_info.peer_id = admin_peer_id; + admin_info.version = 1; + admin_info.last_update = + Some((now - REMOVE_DEAD_PEER_INFO_AFTER - Duration::from_secs(1)).into()); + admin_info.noise_static_pubkey = admin_pubkey; + admin_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkeyProof { + credential: Some(TrustedCredentialPubkey { + pubkey: credential_pubkey.clone(), + expiry_unix: i64::MAX, + ..Default::default() + }), + credential_hmac: vec![1; 32], + }]; - let make_raw = |info: &RoutePeerInfo| { - let mut raw = DynamicMessage::new(RoutePeerInfo::default().descriptor()); - raw.transcode_from(info).unwrap(); - raw - }; - let raw_infos = vec![make_raw(&sender_info), make_raw(&other_info)]; + let mut credential_info = RoutePeerInfo::new(); + credential_info.peer_id = credential_peer_id; + credential_info.version = 1; + credential_info.last_update = Some(now.into()); + credential_info.noise_static_pubkey = credential_pubkey.clone(); - route - .session_mgr - .do_sync_route_info( - from_peer_id, - 1, - true, - Some(vec![sender_info, other_info]), - Some(raw_infos), - None, - None, - ) - .await - .unwrap(); + guard.insert(admin_peer_id, admin_info); + guard.insert(credential_peer_id, credential_info); + } - let guard = route.service_impl.synced_route_info.peer_infos.read(); - let sender = guard.get(&from_peer_id).unwrap(); - assert!(!sender - .feature_flag - .as_ref() - .map(|x| x.is_credential_peer) - .unwrap_or(false)); - assert!(guard.get(&other_peer_id).is_some()); + let (_, global_trusted_keys) = service_impl + .synced_route_info + .verify_and_update_credential_trusts(None); + service_impl + .global_ctx + .update_trusted_keys(global_trusted_keys, &network_name); + + assert!(service_impl + .synced_route_info + .trusted_credential_pubkeys + .contains_key(&credential_pubkey)); + + service_impl.clear_expired_peer().await; + + assert!(!service_impl.global_ctx.is_pubkey_trusted_with_source( + &credential_pubkey, + &network_name, + TrustedKeySource::OspfCredential, + )); + assert!(closed_peers.lock().contains(&credential_peer_id)); + assert!(!service_impl + .synced_route_info + .peer_infos + .read() + .contains_key(&admin_peer_id)); + assert!(!service_impl + .synced_route_info + .peer_infos + .read() + .contains_key(&credential_peer_id)); } #[rstest::rstest] @@ -4453,13 +4781,29 @@ mod tests { let peer_mgr = create_mock_pmgr().await; let route = create_mock_route(peer_mgr.clone()).await; let from_peer_id: PeerId = 20001; + let credential_pubkey = vec![4u8; 32]; let identity_type = DashMap::new(); identity_type.insert(from_peer_id, PeerIdentityType::Credential); + let peer_public_key = DashMap::new(); + peer_public_key.insert(from_peer_id, credential_pubkey.clone()); *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { my_peer_id: peer_mgr.my_peer_id(), identity_type, + peer_public_key, })); + route + .service_impl + .synced_route_info + .trusted_credential_pubkeys + .insert( + credential_pubkey.clone(), + TrustedCredentialPubkey { + pubkey: credential_pubkey, + expiry_unix: i64::MAX, + ..Default::default() + }, + ); let mut sender_info = RoutePeerInfo::new(); sender_info.peer_id = from_peer_id; @@ -4505,6 +4849,7 @@ mod tests { *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { my_peer_id: peer_mgr.my_peer_id(), identity_type, + peer_public_key: DashMap::new(), })); let mut sender_info = RoutePeerInfo::new(); @@ -4575,6 +4920,7 @@ mod tests { *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { my_peer_id: peer_mgr.my_peer_id(), identity_type, + peer_public_key: DashMap::new(), })); let mut sender_info = RoutePeerInfo::new(); diff --git a/easytier/src/peers/peer_session.rs b/easytier/src/peers/peer_session.rs index decd6517..fceb0822 100644 --- a/easytier/src/peers/peer_session.rs +++ b/easytier/src/peers/peer_session.rs @@ -212,7 +212,12 @@ impl PeerSessionStore { .clone(); session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?; session.check_or_set_peer_static_pubkey(peer_static_pubkey)?; - session.sync_root_key(root_key, b_session_generation, initial_epoch); + session.sync_root_key( + root_key, + b_session_generation, + initial_epoch, + matches!(action, PeerSessionAction::Sync), + ); Ok(session) } } @@ -352,6 +357,33 @@ impl EpochRxSlot { } } +#[derive(Debug, Clone, Copy, Default)] +struct SyncRxGrace { + slots: [[EpochRxSlot; 2]; 2], + expires_at_ms: u64, + valid: bool, +} + +impl SyncRxGrace { + fn clear(&mut self) { + self.slots = [[EpochRxSlot::default(), EpochRxSlot::default()]; 2]; + self.expires_at_ms = 0; + self.valid = false; + } + + fn refresh(&mut self, slots: [[EpochRxSlot; 2]; 2], expires_at_ms: u64) { + self.slots = slots; + self.expires_at_ms = expires_at_ms; + self.valid = true; + } + + fn maybe_expire(&mut self, now_ms: u64) { + if self.valid && now_ms >= self.expires_at_ms { + self.clear(); + } + } +} + pub struct PeerSession { peer_id: PeerId, root_key: RwLock<[u8; 32]>, @@ -365,6 +397,8 @@ pub struct PeerSession { rx_slots: Mutex<[[EpochRxSlot; 2]; 2]>, key_cache: Mutex<[[EpochKeySlot; 2]; 2]>, + sync_rx_grace: Mutex, + sync_rx_grace_expires_at_ms: AtomicU64, send_cipher_algorithm: String, recv_cipher_algorithm: String, @@ -389,6 +423,11 @@ impl std::fmt::Debug for PeerSession { .field("send_packets_since_epoch", &self.send_packets_since_epoch) .field("rx_slots", &self.rx_slots) .field("key_cache", &self.key_cache) + .field("sync_rx_grace", &self.sync_rx_grace) + .field( + "sync_rx_grace_expires_at_ms", + &self.sync_rx_grace_expires_at_ms, + ) .field("send_cipher_algorithm", &self.send_cipher_algorithm) .field("recv_cipher_algorithm", &self.recv_cipher_algorithm) .finish() @@ -405,6 +444,10 @@ impl PeerSession { /// traffic may want to increase this value; low-latency or tightly /// resource-constrained deployments may lower it. const EVICT_IDLE_AFTER_MS: u64 = 30_000; + /// Keep the pre-sync receive windows alive briefly so in-flight packets + /// from the previous epochs are still accepted after a shared session is + /// synced in place by another connection. + const SYNC_RX_GRACE_AFTER_MS: u64 = 5_000; /// Maximum number of packets to send in a single epoch before forcing /// a key/epoch rotation. @@ -458,6 +501,8 @@ impl PeerSession { send_packets_since_epoch: AtomicU64::new(0), rx_slots: Mutex::new(rx_slots), key_cache: Mutex::new(key_cache), + sync_rx_grace: Mutex::new(SyncRxGrace::default()), + sync_rx_grace_expires_at_ms: AtomicU64::new(0), send_cipher_algorithm, recv_cipher_algorithm, invalidated: AtomicBool::new(false), @@ -540,7 +585,15 @@ impl PeerSession { Ok(()) } - pub fn sync_root_key(&self, root_key: [u8; 32], session_generation: u32, initial_epoch: u32) { + pub fn sync_root_key( + &self, + root_key: [u8; 32], + session_generation: u32, + initial_epoch: u32, + preserve_rx_grace: bool, + ) { + let old_root_key = self.root_key(); + let can_preserve_rx_grace = preserve_rx_grace && old_root_key == root_key; { let mut g = self.root_key.write().unwrap(); *g = root_key; @@ -557,6 +610,16 @@ impl PeerSession { { let mut rx = self.rx_slots.lock().unwrap(); + let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap(); + if can_preserve_rx_grace { + let expires_at_ms = now_ms().saturating_add(Self::SYNC_RX_GRACE_AFTER_MS); + sync_rx_grace.refresh(*rx, expires_at_ms); + self.sync_rx_grace_expires_at_ms + .store(expires_at_ms, Ordering::Relaxed); + } else { + sync_rx_grace.clear(); + self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed); + } for dir in 0..2 { rx[dir][0].clear(); rx[dir][1].clear(); @@ -598,21 +661,17 @@ impl PeerSession { key } - fn get_encryptor(&self, epoch: u32, dir: usize, is_send: bool) -> Option> { - let generation = self.session_generation(); - let rx = self.rx_slots.lock().unwrap(); - let send_epoch = self.send_epoch.load(Ordering::Relaxed); - let allowed = epoch == send_epoch - || rx[dir][0].valid && rx[dir][0].epoch == epoch - || rx[dir][1].valid && rx[dir][1].epoch == epoch; - if !allowed { - return None; - } - + fn get_or_create_encryptor( + &self, + epoch: u32, + dir: usize, + generation: u32, + is_send: bool, + ) -> Arc { let mut guard = self.key_cache.lock().unwrap(); for slot in guard[dir].iter_mut() { if slot.valid && slot.epoch == epoch && slot.generation == generation { - return Some(slot.get_encryptor(is_send)); + return slot.get_encryptor(is_send); } } @@ -635,7 +694,7 @@ impl PeerSession { guard[dir][1] = slot; } - Some(ret) + ret } fn maybe_rotate_epoch(&self, now_ms: u64) { @@ -698,9 +757,38 @@ impl PeerSession { } } + fn epoch_in_slots(slots: &[EpochRxSlot; 2], epoch: u32) -> bool { + slots[0].valid && slots[0].epoch == epoch || slots[1].valid && slots[1].epoch == epoch + } + + fn sync_rx_grace_active(&self, now_ms: u64) -> bool { + let expires_at_ms = self.sync_rx_grace_expires_at_ms.load(Ordering::Relaxed); + if expires_at_ms == 0 { + return false; + } + if now_ms < expires_at_ms { + return true; + } + self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed); + false + } + fn check_replay(&self, epoch: u32, seq: u64, dir: usize, now_ms: u64) -> bool { let mut rx = self.rx_slots.lock().unwrap(); Self::evict_old_rx_slots(&mut rx, now_ms); + let mut sync_rx_grace = if self.sync_rx_grace_active(now_ms) { + let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap(); + sync_rx_grace.maybe_expire(now_ms); + if sync_rx_grace.valid { + Self::evict_old_rx_slots(&mut sync_rx_grace.slots, now_ms); + Some(sync_rx_grace) + } else { + self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed); + None + } + } else { + None + }; let send_epoch = self.send_epoch.load(Ordering::Relaxed); { let mut key_cache = self.key_cache.lock().unwrap(); @@ -712,7 +800,10 @@ impl PeerSession { let e = key_cache[d][s].epoch; let allowed = e == send_epoch || rx[d][0].valid && rx[d][0].epoch == e - || rx[d][1].valid && rx[d][1].epoch == e; + || rx[d][1].valid && rx[d][1].epoch == e + || sync_rx_grace + .as_ref() + .is_some_and(|g| Self::epoch_in_slots(&g.slots[d], e)); if !allowed { key_cache[d][s].valid = false; } @@ -720,6 +811,18 @@ impl PeerSession { } } + if sync_rx_grace + .as_ref() + .is_some_and(|g| Self::epoch_in_slots(&g.slots[dir], epoch)) + { + for slot in sync_rx_grace.as_mut().unwrap().slots[dir].iter_mut() { + if slot.valid && slot.epoch == epoch { + slot.last_rx_ms = now_ms; + return slot.window.accept(seq); + } + } + } + if !rx[dir][0].valid { rx[dir][0] = EpochRxSlot { epoch, @@ -777,9 +880,7 @@ impl PeerSession { } let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id); let (epoch, _seq, nonce_bytes) = self.next_nonce(dir); - let encryptor = self - .get_encryptor(epoch, dir, true) - .ok_or_else(|| anyhow!("no key for epoch"))?; + let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), true); if let Err(e) = encryptor.encrypt_with_nonce(pkt, Some(nonce_bytes.as_slice())) { tracing::warn!( peer_id = ?self.peer_id, @@ -816,9 +917,7 @@ impl PeerSession { )); } - let encryptor = self - .get_encryptor(epoch, dir, false) - .ok_or_else(|| anyhow!("no key for epoch"))?; + let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), false); if let Err(e) = encryptor.decrypt(ciphertext_with_tail) { let count = self.decrypt_fail_count.fetch_add(1, Ordering::Relaxed) + 1; if count >= Self::DECRYPT_FAIL_THRESHOLD { @@ -974,7 +1073,7 @@ mod tests { assert!(s.check_replay(0, 1, 0, now)); // Sync with initial_epoch=2 (simulating a Sync action) - s.sync_root_key(root_key, 2, 2); + s.sync_root_key(root_key, 2, 2, true); // Remote peer is still sending at epoch 0 — should be accepted // (rx_slots were cleared, so the first packet establishes the epoch) @@ -983,4 +1082,85 @@ mod tests { "packets at old epoch should be accepted after sync" ); } + + #[test] + fn sync_root_key_keeps_previous_epochs_during_grace_window() { + let peer_id: PeerId = 10; + let root_key = PeerSession::new_root_key(); + let s = PeerSession::new( + peer_id, + root_key, + 1, + 0, + "aes-256-gcm".to_string(), + "aes-256-gcm".to_string(), + None, + ); + + let now = now_ms(); + assert!(s.check_replay(0, 0, 0, now)); + assert!(s.check_replay(1, 0, 0, now + 1)); + + s.sync_root_key(root_key, 2, 2, true); + + // The first packet after sync may already use the new epoch. + assert!(s.check_replay(2, 0, 0, now + 2)); + // Older in-flight packets from pre-sync epochs should still be accepted + // during the grace period, regardless of arrival order. + assert!(s.check_replay(1, 1, 0, now + 3)); + assert!(s.check_replay(0, 1, 0, now + 4)); + } + + #[test] + fn sync_root_key_expires_previous_epochs_after_grace_window() { + let peer_id: PeerId = 10; + let root_key = PeerSession::new_root_key(); + let s = PeerSession::new( + peer_id, + root_key, + 1, + 0, + "aes-256-gcm".to_string(), + "aes-256-gcm".to_string(), + None, + ); + + let now = now_ms(); + assert!(s.check_replay(0, 0, 0, now)); + assert!(s.check_replay(1, 0, 0, now + 1)); + + s.sync_root_key(root_key, 2, 2, true); + assert!(s.check_replay(2, 0, 0, now + 2)); + + assert!( + !s.check_replay(0, 1, 0, now + PeerSession::SYNC_RX_GRACE_AFTER_MS + 3), + "old epochs should stop being accepted once the sync grace window expires" + ); + } + + #[test] + fn sync_root_key_does_not_preserve_previous_epochs_when_root_key_changes() { + let peer_id: PeerId = 10; + let root_key = PeerSession::new_root_key(); + let s = PeerSession::new( + peer_id, + root_key, + 1, + 0, + "aes-256-gcm".to_string(), + "aes-256-gcm".to_string(), + None, + ); + + let now = now_ms(); + assert!(s.check_replay(0, 0, 0, now)); + assert!(s.check_replay(1, 0, 0, now + 1)); + + s.sync_root_key(PeerSession::new_root_key(), 2, 2, true); + assert!(s.check_replay(2, 0, 0, now + 2)); + assert!( + !s.check_replay(1, 1, 0, now + 3), + "old epochs should not be preserved when sync replaces the root key" + ); + } } diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index 7dc0319e..b5b4d9b7 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -27,7 +27,13 @@ pub type ForeignNetworkRouteInfoMap = pub trait RouteInterface { async fn list_peers(&self) -> Vec; fn my_peer_id(&self) -> PeerId; + fn need_periodic_requery_peers(&self) -> bool { + false + } async fn close_peer(&self, _peer_id: PeerId) {} + async fn get_peer_public_key(&self, _peer_id: PeerId) -> Option> { + None + } async fn get_peer_identity_type(&self, _peer_id: PeerId) -> Option { None } diff --git a/easytier/src/proto/web.proto b/easytier/src/proto/web.proto index 0b283254..c28595e9 100644 --- a/easytier/src/proto/web.proto +++ b/easytier/src/proto/web.proto @@ -4,6 +4,12 @@ import "common.proto"; package web; +message DeviceOsInfo { + string os_type = 1; + string version = 2; + string distribution = 3; +} + message HeartbeatRequest { common.UUID machine_id = 1; common.UUID inst_id = 2; @@ -14,6 +20,7 @@ message HeartbeatRequest { string hostname = 6; repeated common.UUID running_network_instances = 7; + DeviceOsInfo device_os = 8; } message HeartbeatResponse {} diff --git a/easytier/src/tunnel/fake_tcp/mod.rs b/easytier/src/tunnel/fake_tcp/mod.rs index 8d606399..e4469459 100644 --- a/easytier/src/tunnel/fake_tcp/mod.rs +++ b/easytier/src/tunnel/fake_tcp/mod.rs @@ -57,6 +57,17 @@ fn get_faketcp_tunnel_type_str(driver_type: &str) -> String { format!("faketcp_{}", driver_type) } +async fn create_tun_off_runtime( + interface_name: String, + src_addr: Option, + dst_addr: SocketAddr, +) -> Result, TunnelError> { + tokio::task::spawn_blocking(move || create_tun(&interface_name, src_addr, dst_addr)) + .await + .map_err(|e| TunnelError::InternalError(format!("faketcp create_tun task failed: {e}")))? + .map_err(Into::into) +} + pub struct FakeTcpTunnelListener { addr: url::Url, os_listener: Option, @@ -137,7 +148,9 @@ impl FakeTcpTunnelListener { let ret = match self.stack_map.entry(interface_name.to_string()) { dashmap::Entry::Occupied(entry) => entry.get().clone(), dashmap::Entry::Vacant(entry) => { - let tun = create_tun(interface_name, None, local_socket_addr)?; + let tun = + create_tun_off_runtime(interface_name.to_string(), None, local_socket_addr) + .await?; tracing::info!( ?local_socket_addr, "create new stack with interface_name: {:?}", @@ -314,7 +327,8 @@ impl crate::tunnel::TunnelConnector for FakeTcpTunnelConnector { IpAddr::V6(ip) => (None, Some(ip)), }; - let tun = create_tun(&interface_name, Some(remote_addr), local_addr)?; + let tun = + create_tun_off_runtime(interface_name.clone(), Some(remote_addr), local_addr).await?; let local_ip = local_ip.unwrap_or("0.0.0.0".parse().unwrap()); let mut stack = stack::Stack::new(tun, local_ip, local_ip6, mac); let driver_type = stack.driver_type(); diff --git a/easytier/src/tunnel/fake_tcp/netfilter/linux_bpf.rs b/easytier/src/tunnel/fake_tcp/netfilter/linux_bpf.rs index 879c467e..b710fc15 100644 --- a/easytier/src/tunnel/fake_tcp/netfilter/linux_bpf.rs +++ b/easytier/src/tunnel/fake_tcp/netfilter/linux_bpf.rs @@ -367,7 +367,7 @@ fn read_packet_socket_stats(fd: i32) -> io::Result { } pub struct LinuxBpfTun { - fd: OwnedFd, + fd: Arc, ifindex: i32, stop: Arc, worker: Option>, @@ -395,7 +395,7 @@ impl LinuxBpfTun { if fd < 0 { return Err(io::Error::last_os_error()); } - let fd = unsafe { OwnedFd::from_raw_fd(fd) }; + let fd = Arc::new(unsafe { OwnedFd::from_raw_fd(fd) }); let mut addr: libc::sockaddr_ll = unsafe { mem::zeroed() }; addr.sll_family = libc::AF_PACKET as u16; @@ -404,7 +404,7 @@ impl LinuxBpfTun { let bind_ret = unsafe { libc::bind( - fd.as_raw_fd(), + fd.as_ref().as_raw_fd(), &addr as *const _ as *const libc::sockaddr, mem::size_of::() as u32, ) @@ -413,7 +413,7 @@ impl LinuxBpfTun { return Err(io::Error::last_os_error()); } - let actual_rcvbuf = set_socket_rcvbuf(fd.as_raw_fd(), DEFAULT_RCVBUF_BYTES)?; + let actual_rcvbuf = set_socket_rcvbuf(fd.as_ref().as_raw_fd(), DEFAULT_RCVBUF_BYTES)?; let filter = build_tcp_filter(src_addr, dst_addr)?; let mut prog = libc::sock_fprog { @@ -425,7 +425,7 @@ impl LinuxBpfTun { }; let opt_ret = unsafe { libc::setsockopt( - fd.as_raw_fd(), + fd.as_ref().as_raw_fd(), libc::SOL_SOCKET, libc::SO_ATTACH_FILTER, &mut prog as *mut _ as *mut libc::c_void, @@ -442,7 +442,7 @@ impl LinuxBpfTun { }; let _ = unsafe { libc::setsockopt( - fd.as_raw_fd(), + fd.as_ref().as_raw_fd(), libc::SOL_SOCKET, libc::SO_RCVTIMEO, &timeout as *const _ as *const libc::c_void, @@ -453,10 +453,13 @@ impl LinuxBpfTun { let stop = Arc::new(AtomicBool::new(false)); let (tx, rx) = tokio::sync::mpsc::channel(1024); let stop_clone = stop.clone(); - let read_fd = fd.as_raw_fd(); + let read_fd = fd.as_ref().as_raw_fd(); + let fd_guard = fd.clone(); let interface_name_for_worker = interface_name.to_string(); let worker = std::thread::spawn(move || { + // Keep the packet socket alive until the detached worker actually exits. + let _fd_guard = fd_guard; let mut buf = vec![0u8; 65536]; let mut stats_enabled = true; let mut total_packets: u64 = 0; @@ -562,9 +565,11 @@ impl LinuxBpfTun { impl Drop for LinuxBpfTun { fn drop(&mut self) { self.stop.store(true, AtomicOrdering::Relaxed); - let _ = unsafe { libc::shutdown(self.fd.as_raw_fd(), libc::SHUT_RD) }; + let _ = unsafe { libc::shutdown(self.fd.as_ref().as_raw_fd(), libc::SHUT_RD) }; if let Some(worker) = self.worker.take() { - let _ = worker.join(); + // Dropping the JoinHandle detaches the worker. The worker holds its own Arc + // clone, so the packet socket stays valid until recv wakes up and the thread exits. + drop(worker); } } } @@ -602,7 +607,7 @@ impl stack::Tun for LinuxBpfTun { let ret = unsafe { libc::sendto( - self.fd.as_raw_fd(), + self.fd.as_ref().as_raw_fd(), packet.as_ptr() as *const libc::c_void, packet.len(), 0, diff --git a/easytier/src/web_client/controller.rs b/easytier/src/web_client/controller.rs index c8c4c4b8..177d4576 100644 --- a/easytier/src/web_client/controller.rs +++ b/easytier/src/web_client/controller.rs @@ -1,13 +1,16 @@ use std::sync::Arc; use crate::{ - instance_manager::NetworkInstanceManager, proto::rpc_impl::service_registry::ServiceRegistry, - rpc_service::api::register_api_rpc_service, web_client::WebClientHooks, + instance_manager::NetworkInstanceManager, + proto::{rpc_impl::service_registry::ServiceRegistry, web::DeviceOsInfo}, + rpc_service::api::register_api_rpc_service, + web_client::WebClientHooks, }; pub struct Controller { token: String, hostname: String, + device_os: DeviceOsInfo, manager: Arc, hooks: Arc, } @@ -16,12 +19,14 @@ impl Controller { pub fn new( token: String, hostname: String, + device_os: DeviceOsInfo, manager: Arc, hooks: Arc, ) -> Self { Controller { token, hostname, + device_os, manager, hooks, } @@ -39,6 +44,10 @@ impl Controller { self.hostname.clone() } + pub fn device_os(&self) -> DeviceOsInfo { + self.device_os.clone() + } + pub fn register_api_rpc_service(&self, registry: &ServiceRegistry) { register_api_rpc_service(&self.manager, registry, Some(self.hooks.clone())); } diff --git a/easytier/src/web_client/mod.rs b/easytier/src/web_client/mod.rs index c2953079..47710a0c 100644 --- a/easytier/src/web_client/mod.rs +++ b/easytier/src/web_client/mod.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use crate::{ common::{ - config::TomlConfigLoader, global_ctx::GlobalCtx, log, scoped_task::ScopedTask, - set_default_machine_id, stun::MockStunInfoCollector, + config::TomlConfigLoader, global_ctx::GlobalCtx, log, os_info::collect_device_os_info, + scoped_task::ScopedTask, set_default_machine_id, stun::MockStunInfoCollector, }, connector::create_connector_by_url, instance_manager::{DaemonGuard, NetworkInstanceManager}, @@ -62,6 +62,7 @@ impl WebClient { let controller = Arc::new(controller::Controller::new( token.to_string(), hostname.to_string(), + collect_device_os_info(), manager, hooks, )); diff --git a/easytier/src/web_client/session.rs b/easytier/src/web_client/session.rs index 75de2df1..bf33bc65 100644 --- a/easytier/src/web_client/session.rs +++ b/easytier/src/web_client/session.rs @@ -69,6 +69,7 @@ impl Session { let inst_id = uuid::Uuid::new_v4(); let token = controller.upgrade().unwrap().token(); let hostname = controller.upgrade().unwrap().hostname(); + let device_os = controller.upgrade().unwrap().device_os(); let ctx_clone = ctx.clone(); let mut tick = interval(std::time::Duration::from_secs(1)); @@ -91,6 +92,7 @@ impl Session { easytier_version: EASYTIER_VERSION.to_string(), hostname: hostname.clone(), report_time: chrono::Local::now().to_rfc3339(), + device_os: Some(device_os.clone()), running_network_instances: controller .list_network_instance_ids()