diff --git a/easytier-contrib/easytier-ffi/src/lib.rs b/easytier-contrib/easytier-ffi/src/lib.rs index c4dc9b8a..8686e534 100644 --- a/easytier-contrib/easytier-ffi/src/lib.rs +++ b/easytier-contrib/easytier-ffi/src/lib.rs @@ -215,7 +215,7 @@ pub unsafe extern "C" fn collect_network_infos( if index >= max_length { break; } - let Some(key) = INSTANCE_MANAGER.get_network_instance_name(instance_id) else { + let Some(key) = INSTANCE_MANAGER.get_instance_name(instance_id) else { continue; }; // convert value to json string @@ -228,7 +228,7 @@ pub unsafe extern "C" fn collect_network_infos( }; infos[index] = KeyValuePair { - key: std::ffi::CString::new(key.clone()).unwrap().into_raw(), + key: std::ffi::CString::new(key).unwrap().into_raw(), value: std::ffi::CString::new(value).unwrap().into_raw(), }; index += 1; diff --git a/easytier-web/frontend-lib/src/types/network.ts b/easytier-web/frontend-lib/src/types/network.ts index a7d7a630..2f554dbc 100644 --- a/easytier-web/frontend-lib/src/types/network.ts +++ b/easytier-web/frontend-lib/src/types/network.ts @@ -6,6 +6,14 @@ export enum NetworkingMethod { Standalone = 2, } +export interface SecureModeConfig { + enabled: boolean + // Keep protocol compatibility with backend/import-export flows even though the GUI + // does not render secure-mode or credential inputs. + local_private_key?: string + local_public_key?: string +} + export interface NetworkConfig { instance_id: string @@ -14,7 +22,9 @@ export interface NetworkConfig { network_length: number hostname?: string network_name: string - network_secret: string + network_secret?: string + credential_file?: string + secure_mode?: SecureModeConfig networking_method: NetworkingMethod @@ -83,6 +93,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig { network_length: 24, network_name: 'easytier', network_secret: '', + credential_file: '', networking_method: NetworkingMethod.PublicServer, diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs index 390daccb..69e9355f 100644 --- a/easytier-web/src/client_manager/mod.rs +++ b/easytier-web/src/client_manager/mod.rs @@ -19,6 +19,7 @@ use maxminddb::geoip2; use session::{Location, Session}; use storage::{Storage, StorageToken}; +use crate::webhook::SharedWebhookConfig; use crate::FeatureFlags; use tokio::task::JoinSet; @@ -59,12 +60,18 @@ pub struct ClientManager { storage: Storage, feature_flags: Arc, + webhook_config: SharedWebhookConfig, geoip_db: Arc>>>, } impl ClientManager { - pub fn new(db: Db, geoip_db: Option, feature_flags: Arc) -> Self { + pub fn new( + db: Db, + geoip_db: Option, + feature_flags: Arc, + webhook_config: SharedWebhookConfig, + ) -> Self { let client_sessions = Arc::new(DashMap::new()); let sessions: Arc>> = client_sessions.clone(); let mut tasks = JoinSet::new(); @@ -82,6 +89,7 @@ impl ClientManager { client_sessions, storage: Storage::new(db), feature_flags, + webhook_config, geoip_db: Arc::new(load_geoip_db(geoip_db)), } @@ -98,6 +106,7 @@ impl ClientManager { let listeners_cnt = self.listeners_cnt.clone(); let geoip_db = self.geoip_db.clone(); let feature_flags = self.feature_flags.clone(); + let webhook_config = self.webhook_config.clone(); self.tasks.spawn(async move { while let Ok(tunnel) = listener.accept().await { let (tunnel, secure) = match security::accept_or_upgrade_server_tunnel(tunnel).await { @@ -121,6 +130,7 @@ impl ClientManager { client_url.clone(), location, feature_flags.clone(), + webhook_config.clone(), ); session.serve(tunnel).await; sessions.insert(client_url, Arc::new(session)); @@ -165,6 +175,36 @@ impl ClientManager { .map(|item| item.value().clone()) } + /// Find a session by machine_id regardless of user_id. + pub fn get_session_by_machine_id_global( + &self, + machine_id: &uuid::Uuid, + ) -> Option> { + self.storage + .get_client_url_by_machine_id_global(machine_id) + .and_then(|url| { + self.client_sessions + .get(&url) + .map(|item| item.value().clone()) + }) + } + + /// Get user_id associated with a machine_id. + pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { + self.storage.get_user_id_by_machine_id_global(machine_id) + } + + pub async fn disconnect_session_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> bool { + let Some(client_url) = self.storage.get_client_url_by_machine_id_global(machine_id) else { + return false; + }; + let Some((_, session)) = self.client_sessions.remove(&client_url) else { + return false; + }; + session.stop().await; + true + } + pub async fn list_machine_by_user_id(&self, user_id: UserIdInDb) -> Vec { self.storage.list_user_clients(user_id) } @@ -321,6 +361,9 @@ mod tests { Db::memory_db().await, None, Arc::new(FeatureFlags::default()), + Arc::new(crate::webhook::WebhookConfig::new( + None, None, None, None, None, + )), ); mgr.add_listener(Box::new(listener)).await.unwrap(); diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs index 7d88da86..c6e14b23 100644 --- a/easytier-web/src/client_manager/session.rs +++ b/easytier-web/src/client_manager/session.rs @@ -18,6 +18,7 @@ use easytier::{ use tokio::sync::{broadcast, RwLock}; use super::storage::{Storage, StorageToken, WeakRefStorage}; +use crate::webhook::SharedWebhookConfig; use crate::FeatureFlags; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -31,9 +32,11 @@ pub struct Location { pub struct SessionData { storage: WeakRefStorage, feature_flags: Arc, + webhook_config: SharedWebhookConfig, client_url: url::Url, storage_token: Option, + binding_version: Option, notifier: broadcast::Sender, req: Option, location: Option, @@ -45,14 +48,17 @@ impl SessionData { client_url: url::Url, location: Option, feature_flags: Arc, + webhook_config: SharedWebhookConfig, ) -> Self { let (tx, _rx1) = broadcast::channel(2); SessionData { storage, feature_flags, + webhook_config, client_url, storage_token: None, + binding_version: None, notifier: tx, req: None, location, @@ -77,6 +83,23 @@ impl Drop for SessionData { if let Ok(storage) = Storage::try_from(self.storage.clone()) { if let Some(token) = self.storage_token.as_ref() { storage.remove_client(token); + + // Notify the webhook receiver when a node disconnects. + if self.webhook_config.is_enabled() { + let webhook = self.webhook_config.clone(); + let machine_id = token.machine_id.to_string(); + let web_instance_id = webhook.web_instance_id.clone(); + let binding_version = self.binding_version; + tokio::spawn(async move { + webhook + .notify_node_disconnected(&crate::webhook::NodeDisconnectedRequest { + machine_id, + web_instance_id, + binding_version, + }) + .await; + }); + } } } } @@ -90,6 +113,58 @@ struct SessionRpcService { } impl SessionRpcService { + async fn persist_webhook_network_config( + storage: &Storage, + user_id: i32, + machine_id: uuid::Uuid, + network_config: serde_json::Value, + ) -> anyhow::Result<()> { + let mut network_config = network_config; + let network_name = network_config + .get("network_name") + .and_then(|v| v.as_str()) + .filter(|v| !v.is_empty()) + .ok_or_else(|| anyhow::anyhow!("webhook response missing network_name"))? + .to_string(); + let existing_configs = storage + .db() + .list_network_configs((user_id, machine_id), ListNetworkProps::All) + .await + .map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?; + let inst_id = existing_configs + .iter() + .find_map(|cfg| { + let value = serde_json::from_str::(&cfg.network_config).ok()?; + let cfg_network_name = value.get("network_name")?.as_str()?; + if cfg_network_name == network_name { + uuid::Uuid::parse_str(&cfg.network_instance_id).ok() + } else { + None + } + }) + .unwrap_or_else(uuid::Uuid::new_v4); + + let config_obj = network_config + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("webhook network_config must be a JSON object"))?; + config_obj.insert( + "instance_id".to_string(), + serde_json::Value::String(inst_id.to_string()), + ); + config_obj + .entry("instance_name".to_string()) + .or_insert_with(|| serde_json::Value::String(network_name.clone())); + + let config = serde_json::from_value::(network_config)?; + storage + .db() + .insert_or_update_user_network_config((user_id, machine_id), inst_id, config) + .await + .map_err(|e| anyhow::anyhow!("failed to persist webhook network config: {:?}", e))?; + + Ok(()) + } + async fn handle_heartbeat( &self, req: HeartbeatRequest, @@ -106,28 +181,92 @@ impl SessionRpcService { req.machine_id ))?; - let user_id = match storage - .db() - .get_user_id_by_token(req.user_token.clone()) - .await - .with_context(|| { - format!( - "Failed to get user id by token from db: {:?}", + let (user_id, webhook_network_config, webhook_validated, binding_version) = if data + .webhook_config + .is_enabled() + { + let webhook_req = crate::webhook::ValidateTokenRequest { + token: req.user_token.clone(), + machine_id: machine_id.to_string(), + hostname: req.hostname.clone(), + version: req.easytier_version.clone(), + web_instance_id: data.webhook_config.web_instance_id.clone(), + web_instance_api_base_url: data.webhook_config.web_instance_api_base_url.clone(), + }; + let resp = data + .webhook_config + .validate_token(&webhook_req) + .await + .map_err(|e| anyhow::anyhow!("Webhook token validation failed: {:?}", e))?; + + if resp.valid { + let user_id = match storage + .db() + .get_user_id_by_token(req.user_token.clone()) + .await + .map_err(|e| anyhow::anyhow!("DB error: {:?}", e))? + { + Some(id) => id, + None => storage + .auto_create_user(&req.user_token) + .await + .with_context(|| { + format!("Failed to auto-create webhook user: {:?}", req.user_token) + })?, + }; + ( + user_id, + resp.network_config, + true, + Some(resp.binding_version), + ) + } else { + return Err(anyhow::anyhow!( + "Webhook rejected token for machine {:?}: {:?}", + machine_id, req.user_token ) - })? { - Some(id) => id, - None if data.feature_flags.allow_auto_create_user => storage - .auto_create_user(&req.user_token) - .await - .with_context(|| format!("Failed to auto-create user: {:?}", req.user_token))?, - None => { - return Err( - anyhow::anyhow!("User not found by token: {:?}", req.user_token).into(), - ); + .into()); } + } else { + let user_id = match storage + .db() + .get_user_id_by_token(req.user_token.clone()) + .await + .with_context(|| { + format!( + "Failed to get user id by token from db: {:?}", + req.user_token + ) + })? { + Some(id) => id, + None if data.feature_flags.allow_auto_create_user => storage + .auto_create_user(&req.user_token) + .await + .with_context(|| format!("Failed to auto-create user: {:?}", req.user_token))?, + None => { + return Err( + anyhow::anyhow!("User not found by token: {:?}", req.user_token).into(), + ); + } + }; + (user_id, None, false, None) }; + if webhook_validated { + if let Some(network_config) = webhook_network_config { + Self::persist_webhook_network_config(&storage, user_id, machine_id, network_config) + .await + .map_err(rpc_types::error::Error::from)?; + } + } else if webhook_network_config.is_some() { + return Err(anyhow::anyhow!( + "unexpected webhook network_config for non-webhook token {:?}", + req.user_token + ) + .into()); + } + if data.req.replace(req.clone()).is_none() { assert!(data.storage_token.is_none()); data.storage_token = Some(StorageToken { @@ -136,6 +275,23 @@ impl SessionRpcService { machine_id, user_id, }); + data.binding_version = binding_version; + + // Notify the webhook receiver on the first successful heartbeat. + if data.webhook_config.is_enabled() { + let webhook = data.webhook_config.clone(); + let connect_req = crate::webhook::NodeConnectedRequest { + machine_id: machine_id.to_string(), + token: req.user_token.clone(), + hostname: req.hostname.clone(), + version: req.easytier_version.clone(), + web_instance_id: webhook.web_instance_id.clone(), + binding_version, + }; + tokio::spawn(async move { + webhook.notify_node_connected(&connect_req).await; + }); + } } let Ok(report_time) = chrono::DateTime::::from_str(&req.report_time) else { @@ -203,8 +359,10 @@ impl Session { client_url: url::Url, location: Option, feature_flags: Arc, + webhook_config: SharedWebhookConfig, ) -> Self { - let session_data = SessionData::new(storage, client_url, location, feature_flags); + let session_data = + SessionData::new(storage, client_url, location, feature_flags, webhook_config); let data = Arc::new(RwLock::new(session_data)); let rpc_mgr = @@ -335,6 +493,10 @@ impl Session { self.rpc_mgr.is_running() } + pub async fn stop(&self) { + self.rpc_mgr.stop().await; + } + pub fn data(&self) -> SharedSessionData { self.data.clone() } diff --git a/easytier-web/src/client_manager/storage.rs b/easytier-web/src/client_manager/storage.rs index 223a1988..7e63d740 100644 --- a/easytier-web/src/client_manager/storage.rs +++ b/easytier-web/src/client_manager/storage.rs @@ -22,6 +22,7 @@ struct ClientInfo { #[derive(Debug)] pub struct StorageInner { user_clients_map: DashMap>, + global_machine_map: DashMap, pub db: Db, } @@ -41,22 +42,19 @@ impl Storage { pub fn new(db: Db) -> Self { Storage(Arc::new(StorageInner { user_clients_map: DashMap::new(), + global_machine_map: DashMap::new(), db, })) } - fn remove_mid_to_client_info_map( - map: &DashMap, - machine_id: &uuid::Uuid, - client_url: &url::Url, - ) { - map.remove_if(machine_id, |_, v| v.storage_token.client_url == *client_url); + fn remove_client_info_map(map: &DashMap, stoken: &StorageToken) { + map.remove_if(&stoken.machine_id, |_, v| { + v.storage_token.client_url == stoken.client_url + && v.storage_token.user_id == stoken.user_id + }); } - fn update_mid_to_client_info_map( - map: &DashMap, - client_info: &ClientInfo, - ) { + fn update_client_info_map(map: &DashMap, client_info: &ClientInfo) { map.entry(client_info.storage_token.machine_id) .and_modify(|e| { if e.report_time < client_info.report_time { @@ -78,14 +76,16 @@ impl Storage { report_time, }; - Self::update_mid_to_client_info_map(&inner, &client_info); + Self::update_client_info_map(&inner, &client_info); + Self::update_client_info_map(&self.0.global_machine_map, &client_info); } pub fn remove_client(&self, stoken: &StorageToken) { + Self::remove_client_info_map(&self.0.global_machine_map, stoken); self.0 .user_clients_map .remove_if(&stoken.user_id, |_, set| { - Self::remove_mid_to_client_info_map(set, &stoken.machine_id, &stoken.client_url); + Self::remove_client_info_map(set, stoken); set.is_empty() }); } @@ -106,6 +106,22 @@ impl Storage { }) } + /// Find client_url by machine_id across all users. + pub fn get_client_url_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { + self.0 + .global_machine_map + .get(machine_id) + .map(|info| info.storage_token.client_url.clone()) + } + + /// Find user_id by machine_id across all users. + pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option { + self.0 + .global_machine_map + .get(machine_id) + .map(|info| info.storage_token.user_id) + } + pub fn list_user_clients(&self, user_id: UserIdInDb) -> Vec { self.0 .user_clients_map @@ -129,3 +145,57 @@ impl Storage { Ok(new_user.id) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_storage_token( + user_id: UserIdInDb, + machine_id: uuid::Uuid, + client_url: &str, + ) -> StorageToken { + StorageToken { + token: format!("token-{machine_id}"), + client_url: client_url.parse().unwrap(), + machine_id, + user_id, + } + } + + #[tokio::test] + async fn global_machine_index_uses_latest_report_and_ignores_stale_removal() { + let storage = Storage::new(Db::memory_db().await); + let machine_id = uuid::Uuid::new_v4(); + + let old_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1001"); + let new_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1002"); + + storage.update_client(old_token.clone(), 10); + storage.update_client(new_token.clone(), 20); + + assert_eq!( + storage.get_client_url_by_machine_id_global(&machine_id), + Some(new_token.client_url.clone()) + ); + assert_eq!( + storage.get_user_id_by_machine_id_global(&machine_id), + Some(1) + ); + + storage.remove_client(&old_token); + + assert_eq!( + storage.get_client_url_by_machine_id_global(&machine_id), + Some(new_token.client_url.clone()) + ); + + storage.remove_client(&new_token); + + assert_eq!( + storage.get_client_url_by_machine_id_global(&machine_id), + None + ); + assert_eq!(storage.get_user_id_by_machine_id_global(&machine_id), None); + } +} diff --git a/easytier-web/src/main.rs b/easytier-web/src/main.rs index 31666883..dd50e965 100644 --- a/easytier-web/src/main.rs +++ b/easytier-web/src/main.rs @@ -26,6 +26,7 @@ mod client_manager; mod db; mod migrator; mod restful; +mod webhook; #[cfg(feature = "embed")] mod web; @@ -132,6 +133,34 @@ struct Cli { #[command(flatten)] oidc: restful::oidc::OidcOptions, + + #[command(flatten)] + webhook: WebhookOptions, +} + +#[derive(Debug, Clone, Default, clap::Args)] +pub struct WebhookOptions { + /// Base URL of the webhook endpoint for token validation and event delivery. + /// When set, incoming tokens are validated via this webhook before local fallback. + #[arg(long)] + pub webhook_url: Option, + + /// Shared secret used to authenticate outbound webhook calls. + #[arg(long)] + pub webhook_secret: Option, + + /// Token for X-Internal-Auth header. When set, API requests with this header + /// bypass session authentication. + #[arg(long)] + pub internal_auth_token: Option, + + /// Stable identifier for this easytier-web instance when routing webhook callbacks. + #[arg(long)] + pub web_instance_id: Option, + + /// Reachable base URL for this easytier-web instance's internal REST API. + #[arg(long)] + pub web_instance_api_base_url: Option, } #[derive(Debug, Clone, Default, clap::Args)] @@ -237,8 +266,19 @@ async fn main() { // let db = db::Db::new(":memory:").await.unwrap(); let db = db::Db::new(cli.db).await.unwrap(); let feature_flags = Arc::new(cli.feature_flags); - let mut mgr = - client_manager::ClientManager::new(db.clone(), cli.geoip_db, feature_flags.clone()); + let webhook_config = Arc::new(webhook::WebhookConfig::new( + cli.webhook.webhook_url, + cli.webhook.webhook_secret, + cli.webhook.internal_auth_token, + cli.webhook.web_instance_id, + cli.webhook.web_instance_api_base_url, + )); + let mut mgr = client_manager::ClientManager::new( + db.clone(), + cli.geoip_db, + feature_flags.clone(), + webhook_config.clone(), + ); let (v6_listener, v4_listener) = get_dual_stack_listener(&cli.config_server_protocol, cli.config_server_port) .await @@ -292,6 +332,7 @@ async fn main() { web_router_restful, feature_flags, oidc_config, + webhook_config, ) .await .unwrap() diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs index a92c4dab..d87cf438 100644 --- a/easytier-web/src/restful/mod.rs +++ b/easytier-web/src/restful/mod.rs @@ -7,8 +7,11 @@ mod users; use std::{net::SocketAddr, sync::Arc}; -use axum::http::StatusCode; -use axum::routing::post; +use axum::extract::Path; +use axum::http::{header, Request, StatusCode}; +use axum::middleware::{self as axum_mw, Next}; +use axum::response::Response; +use axum::routing::{delete, post}; use axum::{extract::State, routing::get, Extension, Json, Router}; use axum_login::tower_sessions::{ExpiredDeletion, SessionManagerLayer}; use axum_login::{login_required, AuthManagerLayerBuilder, AuthUser, AuthzBackend}; @@ -29,6 +32,7 @@ use users::{AuthSession, Backend}; use crate::client_manager::storage::StorageToken; use crate::client_manager::ClientManager; use crate::db::Db; +use crate::webhook::SharedWebhookConfig; use crate::FeatureFlags; /// Embed assets for web dashboard, build frontend first @@ -41,12 +45,9 @@ pub struct RestfulServer { bind_addr: SocketAddr, client_mgr: Arc, feature_flags: Arc, + webhook_config: SharedWebhookConfig, db: Db, oidc_config: oidc::OidcConfig, - - // serve_task: Option>, - // delete_task: Option>>, - // network_api: NetworkApi, web_router: Option, } @@ -111,20 +112,17 @@ impl RestfulServer { web_router: Option, feature_flags: Arc, oidc_config: oidc::OidcConfig, + webhook_config: SharedWebhookConfig, ) -> anyhow::Result { assert!(client_mgr.is_running()); - // let network_api = NetworkApi::new(); - Ok(RestfulServer { bind_addr, client_mgr, feature_flags, + webhook_config, db, oidc_config, - // serve_task: None, - // delete_task: None, - // network_api, web_router, }) } @@ -245,7 +243,31 @@ impl RestfulServer { .zstd(true) .quality(tower_http::compression::CompressionLevel::Default); - let app = Router::new() + // Token-authenticated management routes that bypass session auth. + let internal_app = if self.webhook_config.has_internal_auth() { + let internal_token = self.webhook_config.internal_auth_token.clone().unwrap(); + let internal_routes = Router::new() + .route( + "/api/internal/sessions", + get(Self::handle_list_all_sessions_internal), + ) + .route( + "/api/internal/sessions/:machine-id", + delete(Self::handle_disconnect_session_internal), + ) + .merge(NetworkApi::build_route_internal()) + .merge(rpc::router_internal()) + .with_state(self.client_mgr.clone()) + .layer(axum_mw::from_fn(move |req, next| { + let token = internal_token.clone(); + internal_auth_middleware(token, req, next) + })); + Some(internal_routes) + } else { + None + }; + + let mut app = Router::new() .route("/api/v1/summary", get(Self::handle_get_summary)) .route("/api/v1/sessions", get(Self::handle_list_all_sessions)) .merge(NetworkApi::build_route()) @@ -265,6 +287,10 @@ impl RestfulServer { .layer(tower_http::cors::CorsLayer::very_permissive()) .layer(compression_layer); + if let Some(internal_routes) = internal_app { + app = app.merge(internal_routes); + } + #[cfg(feature = "embed")] let app = if let Some(web_router) = self.web_router.take() { app.merge(web_router) @@ -279,4 +305,52 @@ impl RestfulServer { Ok((serve_task, delete_task)) } + + /// Session listing endpoint for token-authenticated management clients. + async fn handle_list_all_sessions_internal( + State(client_mgr): AppState, + ) -> Result, HttpHandleError> { + let ret = client_mgr.list_sessions().await; + Ok(ListSessionJsonResp(ret).into()) + } + + async fn handle_disconnect_session_internal( + Path(machine_id): Path, + State(client_mgr): AppState, + ) -> Result { + if client_mgr + .disconnect_session_by_machine_id_global(&machine_id) + .await + { + Ok(StatusCode::NO_CONTENT) + } else { + Err(( + StatusCode::NOT_FOUND, + other_error("session not found").into(), + )) + } + } +} + +/// Middleware that validates X-Internal-Auth for token-authenticated routes. +async fn internal_auth_middleware( + expected_token: String, + req: Request, + next: Next, +) -> Response { + let auth_header = req + .headers() + .get("X-Internal-Auth") + .and_then(|v| v.to_str().ok()); + + match auth_header { + Some(token) if token == expected_token => next.run(req).await, + _ => Response::builder() + .status(StatusCode::UNAUTHORIZED) + .header(header::CONTENT_TYPE, "application/json") + .body(axum::body::Body::from( + r#"{"error":"unauthorized: invalid or missing X-Internal-Auth header"}"#, + )) + .unwrap(), + } } diff --git a/easytier-web/src/restful/network.rs b/easytier-web/src/restful/network.rs index 7dc59fb5..9fdf772f 100644 --- a/easytier-web/src/restful/network.rs +++ b/easytier-web/src/restful/network.rs @@ -295,6 +295,87 @@ impl NetworkApi { .into()) } + // --- Token-authenticated machine-scoped handlers (no AuthSession) --- + + async fn handle_run_network_instance_internal( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result, HttpHandleError> { + let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; + client_mgr + .handle_run_network_instance((user_id, machine_id), payload.config, payload.save) + .await + .map_err(convert_error)?; + Ok(Void::default().into()) + } + + async fn handle_remove_network_instance_internal( + State(client_mgr): AppState, + Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + ) -> Result<(), HttpHandleError> { + let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; + client_mgr + .handle_remove_network_instances((user_id, machine_id), vec![inst_id]) + .await + .map_err(convert_error) + } + + async fn handle_list_network_instance_ids_internal( + State(client_mgr): AppState, + Path(machine_id): Path, + ) -> Result, HttpHandleError> { + let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; + Ok(client_mgr + .handle_list_network_instance_ids((user_id, machine_id)) + .await + .map_err(convert_error)? + .into()) + } + + async fn handle_collect_network_info_internal( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result, HttpHandleError> { + let user_id = Self::get_user_id_from_machine(&client_mgr, &machine_id)?; + Ok(client_mgr + .handle_collect_network_info((user_id, machine_id), payload.inst_ids) + .await + .map_err(convert_error)? + .into()) + } + + /// Look up user_id from a machine's active session token. + fn get_user_id_from_machine( + client_mgr: &AppStateInner, + machine_id: &uuid::Uuid, + ) -> Result { + client_mgr + .get_user_id_by_machine_id_global(machine_id) + .ok_or(( + StatusCode::NOT_FOUND, + other_error("Machine not found").into(), + )) + } + + pub fn build_route_internal() -> Router { + Router::new() + .route( + "/api/internal/machines/:machine-id/networks", + post(Self::handle_run_network_instance_internal) + .get(Self::handle_list_network_instance_ids_internal), + ) + .route( + "/api/internal/machines/:machine-id/networks/:inst-id", + delete(Self::handle_remove_network_instance_internal), + ) + .route( + "/api/internal/machines/:machine-id/networks/info", + get(Self::handle_collect_network_info_internal), + ) + } + pub fn build_route() -> Router { Router::new() .route("/api/v1/machines", get(Self::handle_list_machines)) diff --git a/easytier-web/src/restful/rpc.rs b/easytier-web/src/restful/rpc.rs index f635114d..458bc642 100644 --- a/easytier-web/src/restful/rpc.rs +++ b/easytier-web/src/restful/rpc.rs @@ -5,6 +5,7 @@ use axum::{ Json, Router, }; use axum_login::AuthUser as _; +use easytier::proto::rpc_types::controller::BaseController; use super::{other_error, AppState, HttpHandleError}; @@ -19,34 +20,15 @@ macro_rules! match_service { ($factory:ty, $method_name:expr, $payload:expr, $session:expr) => {{ let client = $session.scoped_client::<$factory>(); client - .json_call_method( - easytier::proto::rpc_types::controller::BaseController::default(), - &$method_name, - $payload, - ) + .json_call_method(BaseController::default(), &$method_name, $payload) .await }}; } -pub async fn handle_proxy_rpc( - auth_session: super::users::AuthSession, - State(client_mgr): AppState, - Path(machine_id): Path, - Json(req): Json, +async fn handle_proxy_rpc_by_session( + session: &crate::client_manager::session::Session, + req: ProxyRpcRequest, ) -> Result, HttpHandleError> { - let user_id = auth_session - .user - .as_ref() - .ok_or((StatusCode::UNAUTHORIZED, other_error("Unauthorized").into()))? - .id(); - - let session = client_mgr - .get_session_by_machine_id(user_id, &machine_id) - .ok_or(( - StatusCode::NOT_FOUND, - other_error("Session not found").into(), - ))?; - let ProxyRpcRequest { service_name, method_name, @@ -55,97 +37,79 @@ pub async fn handle_proxy_rpc( let resp = match service_name.as_str() { "api.manage.WebClientService" => match_service!( - easytier::proto::api::manage::WebClientServiceClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::manage::WebClientServiceClientFactory, method_name, payload, session ), "api.instance.PeerManageRpcService" => match_service!( - easytier::proto::api::instance::PeerManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::PeerManageRpcClientFactory, + method_name, + payload, + session + ), + "api.instance.PeerCenterManageRpcService" => match_service!( + easytier::proto::peer_rpc::PeerCenterRpcClientFactory, method_name, payload, session ), "api.instance.ConnectorManageRpcService" => match_service!( - easytier::proto::api::instance::ConnectorManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::ConnectorManageRpcClientFactory, method_name, payload, session ), "api.instance.MappedListenerManageRpcService" => match_service!( - easytier::proto::api::instance::MappedListenerManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::MappedListenerManageRpcClientFactory, method_name, payload, session ), "api.instance.VpnPortalRpcService" => match_service!( - easytier::proto::api::instance::VpnPortalRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::VpnPortalRpcClientFactory, method_name, payload, session ), "api.instance.TcpProxyRpcService" => match_service!( - easytier::proto::api::instance::TcpProxyRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::TcpProxyRpcClientFactory, method_name, payload, session ), "api.instance.AclManageRpcService" => match_service!( - easytier::proto::api::instance::AclManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::AclManageRpcClientFactory, method_name, payload, session ), "api.instance.PortForwardManageRpcService" => match_service!( - easytier::proto::api::instance::PortForwardManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::PortForwardManageRpcClientFactory, method_name, payload, session ), "api.instance.StatsRpcService" => match_service!( - easytier::proto::api::instance::StatsRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::StatsRpcClientFactory, method_name, payload, session ), "api.instance.CredentialManageRpcService" => match_service!( - easytier::proto::api::instance::CredentialManageRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::instance::CredentialManageRpcClientFactory, method_name, payload, session ), "api.logger.LoggerRpcService" => match_service!( - easytier::proto::api::logger::LoggerRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::logger::LoggerRpcClientFactory, method_name, payload, session ), "api.config.ConfigRpcService" => match_service!( - easytier::proto::api::config::ConfigRpcClientFactory< - easytier::proto::rpc_types::controller::BaseController, - >, + easytier::proto::api::config::ConfigRpcClientFactory, method_name, payload, session @@ -167,9 +131,52 @@ pub async fn handle_proxy_rpc( } } +pub async fn handle_proxy_rpc( + auth_session: super::users::AuthSession, + State(client_mgr): AppState, + Path(machine_id): Path, + Json(req): Json, +) -> Result, HttpHandleError> { + let user_id = auth_session + .user + .as_ref() + .ok_or((StatusCode::UNAUTHORIZED, other_error("Unauthorized").into()))? + .id(); + + let session = client_mgr + .get_session_by_machine_id(user_id, &machine_id) + .ok_or(( + StatusCode::NOT_FOUND, + other_error("Session not found").into(), + ))?; + handle_proxy_rpc_by_session(session.as_ref(), req).await +} + pub fn router() -> Router { Router::new().route( "/api/v1/machines/:machine-id/proxy-rpc", post(handle_proxy_rpc), ) } + +/// Internal proxy-rpc handler: no AuthSession, resolves the active session by machine_id. +pub async fn handle_proxy_rpc_internal( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(req): Json, +) -> Result, HttpHandleError> { + let session = client_mgr + .get_session_by_machine_id_global(&machine_id) + .ok_or(( + StatusCode::NOT_FOUND, + other_error("Session not found").into(), + ))?; + handle_proxy_rpc_by_session(session.as_ref(), req).await +} + +pub fn router_internal() -> Router { + Router::new().route( + "/api/internal/machines/:machine-id/proxy-rpc", + post(handle_proxy_rpc_internal), + ) +} diff --git a/easytier-web/src/webhook.rs b/easytier-web/src/webhook.rs new file mode 100644 index 00000000..86315512 --- /dev/null +++ b/easytier-web/src/webhook.rs @@ -0,0 +1,169 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +/// Webhook configuration for external integrations. +#[derive(Debug, Clone)] +pub struct WebhookConfig { + pub webhook_url: Option, + pub webhook_secret: Option, + pub internal_auth_token: Option, + pub web_instance_id: Option, + pub web_instance_api_base_url: Option, + + client: reqwest::Client, +} + +impl WebhookConfig { + pub fn new( + webhook_url: Option, + webhook_secret: Option, + internal_auth_token: Option, + web_instance_id: Option, + web_instance_api_base_url: Option, + ) -> Self { + WebhookConfig { + webhook_url, + webhook_secret, + internal_auth_token, + web_instance_id, + web_instance_api_base_url, + client: reqwest::Client::new(), + } + } + + pub fn is_enabled(&self) -> bool { + self.webhook_url + .as_deref() + .is_some_and(|url| !url.trim().is_empty()) + } + + pub fn has_internal_auth(&self) -> bool { + self.internal_auth_token.is_some() + } +} + +// --- Request/Response types --- + +#[derive(Debug, Serialize)] +pub struct ValidateTokenRequest { + pub token: String, + pub machine_id: String, + pub hostname: String, + pub version: String, + pub web_instance_id: Option, + pub web_instance_api_base_url: Option, +} + +#[derive(Debug, Deserialize)] +pub struct ValidateTokenResponse { + pub valid: bool, + #[serde(default)] + pub pre_approved: bool, + #[serde(default)] + pub binding_version: u64, + pub network_config: Option, +} + +#[derive(Debug, Serialize)] +pub struct NodeConnectedRequest { + pub machine_id: String, + pub token: String, + pub hostname: String, + pub version: String, + pub web_instance_id: Option, + pub binding_version: Option, +} + +#[derive(Debug, Serialize)] +pub struct NodeDisconnectedRequest { + pub machine_id: String, + pub web_instance_id: Option, + pub binding_version: Option, +} + +// --- Webhook client --- + +impl WebhookConfig { + fn webhook_base_url(&self) -> anyhow::Result<&str> { + self.webhook_url + .as_deref() + .map(str::trim) + .filter(|url| !url.is_empty()) + .ok_or_else(|| anyhow::anyhow!("webhook_url is not configured")) + } + + fn webhook_endpoint(&self, path: &str) -> anyhow::Result { + Ok(format!( + "{}/{}", + self.webhook_base_url()?.trim_end_matches('/'), + path.trim_start_matches('/'), + )) + } + + /// Validate a token through the configured webhook endpoint. + pub async fn validate_token( + &self, + req: &ValidateTokenRequest, + ) -> anyhow::Result { + let url = self.webhook_endpoint("validate-token")?; + let resp = self + .client + .post(&url) + .header("X-Internal-Auth", self.webhook_auth_secret()) + .json(req) + .send() + .await?; + + if !resp.status().is_success() { + anyhow::bail!("webhook validate-token returned status {}", resp.status()); + } + + Ok(resp.json().await?) + } + + /// Notify the webhook receiver that a node has connected. + pub async fn notify_node_connected(&self, req: &NodeConnectedRequest) { + if !self.is_enabled() { + return; + } + let Ok(url) = self.webhook_endpoint("webhook/node-connected") else { + tracing::warn!("skip node-connected webhook because webhook_url is not configured"); + return; + }; + let _ = self + .client + .post(&url) + .header("X-Internal-Auth", self.webhook_auth_secret()) + .json(req) + .send() + .await; + } + + /// Notify the webhook receiver that a node has disconnected. + pub async fn notify_node_disconnected(&self, req: &NodeDisconnectedRequest) { + if !self.is_enabled() { + return; + } + let Ok(url) = self.webhook_endpoint("webhook/node-disconnected") else { + tracing::warn!("skip node-disconnected webhook because webhook_url is not configured"); + return; + }; + let _ = self + .client + .post(&url) + .header("X-Internal-Auth", self.webhook_auth_secret()) + .json(req) + .send() + .await; + } + + fn webhook_auth_secret(&self) -> &str { + self.webhook_secret + .as_deref() + .or(self.internal_auth_token.as_deref()) + .unwrap_or("") + } +} + +pub type SharedWebhookConfig = Arc; diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index f2491fb4..c8bd738f 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::Context; +use base64::{prelude::BASE64_STANDARD, Engine as _}; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt as _; @@ -405,6 +406,42 @@ impl From for PortForwardConfigPb { } } +pub fn process_secure_mode_cfg(mut user_cfg: SecureModeConfig) -> anyhow::Result { + if !user_cfg.enabled { + return Ok(user_cfg); + } + + let private_key = if user_cfg.local_private_key.is_none() { + // if no private key, generate random one + let private = x25519_dalek::StaticSecret::random_from_rng(rand::rngs::OsRng); + user_cfg.local_private_key = Some(BASE64_STANDARD.encode(private.clone().as_bytes())); + private + } else { + // check if private key is valid + user_cfg.private_key()? + }; + + let public = x25519_dalek::PublicKey::from(&private_key); + + match user_cfg.local_public_key { + None => { + user_cfg.local_public_key = Some(BASE64_STANDARD.encode(public.as_bytes())); + } + Some(ref user_pub) => { + let public = user_cfg.public_key()?; + if *user_pub != BASE64_STANDARD.encode(public.as_bytes()) { + return Err(anyhow::anyhow!( + "local public key {} does not match generated public key {}", + user_pub, + BASE64_STANDARD.encode(public.as_bytes()) + )); + } + } + } + + Ok(user_cfg) +} + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] struct Config { netns: Option, diff --git a/easytier/src/common/log.rs b/easytier/src/common/log.rs index 0c9b1f08..d6058f19 100644 --- a/easytier/src/common/log.rs +++ b/easytier/src/common/log.rs @@ -146,18 +146,37 @@ pub fn init( std::thread::spawn(move || { while let Ok(lf) = recver.recv() { - let e = file_filter_reloader.modify(|f| { - if let Ok(nf) = EnvFilter::builder() - .with_default_directive(lf.parse::().unwrap().into()) - .from_env() - .with_context(|| "failed to create file filter") - { - info!("Reload log filter succeed, new filter level: {:?}", lf); - *f = nf; + let parsed_level = match lf.parse::() { + Ok(level) => level, + Err(e) => { + error!("Failed to parse new log level {:?}: {}", lf, e); + continue; + } + }; + + let mut new_filter = match EnvFilter::builder() + .with_default_directive(parsed_level.into()) + .from_env() + .with_context(|| "failed to create file filter") + { + Ok(filter) => Some(filter), + Err(e) => { + error!("Failed to build new log filter for {:?}: {:?}", lf, e); + continue; + } + }; + + match file_filter_reloader.modify(|f| { + *f = new_filter + .take() + .expect("log filter reloader only applies one filter per reload"); + }) { + Ok(()) => { + info!("Reload log filter succeed, new filter level: {:?}", lf); + } + Err(e) => { + error!("Failed to reload log filter: {:?}", e); } - }); - if e.is_err() { - error!("Failed to reload log filter: {:?}", e); } } info!("Stop log filter reloader"); diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index 73be1ba1..a7bbbbaa 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -102,6 +102,9 @@ pub fn set_default_machine_id(mid: Option) { pub fn get_machine_id() -> uuid::Uuid { if let Some(default_mid) = use_global_var!(MACHINE_UID) { + if let Ok(mid) = uuid::Uuid::parse_str(default_mid.trim()) { + return mid; + } let mut b = [0u8; 16]; crate::tunnel::generate_digest_from_str("", &default_mid, &mut b); return uuid::Uuid::from_bytes(b); @@ -207,4 +210,12 @@ mod tests { assert_eq!(weak_js.weak_count(), 0); assert_eq!(weak_js.strong_count(), 0); } + + #[test] + fn test_get_machine_id_uses_uuid_seed_verbatim() { + let raw = "33333333-3333-3333-3333-333333333333".to_string(); + set_default_machine_id(Some(raw.clone())); + assert_eq!(get_machine_id(), uuid::Uuid::parse_str(&raw).unwrap()); + set_default_machine_id(None); + } } diff --git a/easytier/src/core.rs b/easytier/src/core.rs index a3434138..234de969 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -10,9 +10,10 @@ use std::{ use crate::{ common::{ config::{ - get_avaliable_encrypt_methods, load_config_from_file, ConfigFileControl, ConfigLoader, - ConsoleLoggerConfig, FileLoggerConfig, LoggingConfigLoader, NetworkIdentity, - PeerConfig, PortForwardConfig, TomlConfigLoader, VpnPortalConfig, + get_avaliable_encrypt_methods, load_config_from_file, process_secure_mode_cfg, + ConfigFileControl, ConfigLoader, ConsoleLoggerConfig, FileLoggerConfig, + LoggingConfigLoader, NetworkIdentity, PeerConfig, PortForwardConfig, TomlConfigLoader, + VpnPortalConfig, }, constants::EASYTIER_VERSION, log, @@ -27,10 +28,8 @@ use crate::{ web_client, ShellType, }; use anyhow::Context; -use base64::{prelude::BASE64_STANDARD, Engine as _}; use cidr::IpCidr; use clap::{CommandFactory, Parser}; -use rand::rngs::OsRng; use rust_i18n::t; use tokio::io::AsyncReadExt; @@ -773,42 +772,6 @@ impl NetworkOptions { false } - fn process_secure_mode_cfg(mut user_cfg: SecureModeConfig) -> anyhow::Result { - if !user_cfg.enabled { - return Ok(user_cfg); - } - - let private_key = if user_cfg.local_private_key.is_none() { - // if no private key, generate random one - let private = x25519_dalek::StaticSecret::random_from_rng(OsRng); - user_cfg.local_private_key = Some(BASE64_STANDARD.encode(private.clone().as_bytes())); - private - } else { - // check if private key is valid - user_cfg.private_key()? - }; - - let public = x25519_dalek::PublicKey::from(&private_key); - - match user_cfg.local_public_key { - None => { - user_cfg.local_public_key = Some(BASE64_STANDARD.encode(public.as_bytes())); - } - Some(ref user_pub) => { - let public = user_cfg.public_key()?; - if *user_pub != BASE64_STANDARD.encode(public.as_bytes()) { - return Err(anyhow::anyhow!( - "local public key {} does not match generated public key {}", - user_pub, - BASE64_STANDARD.encode(public.as_bytes()) - )); - } - } - } - - Ok(user_cfg) - } - fn merge_into(&self, cfg: &TomlConfigLoader) -> anyhow::Result<()> { if self.hostname.is_some() { cfg.set_hostname(self.hostname.clone()); @@ -1006,7 +969,7 @@ impl NetworkOptions { local_private_key: Some(credential_secret.clone()), local_public_key: None, }; - cfg.set_secure_mode(Some(Self::process_secure_mode_cfg(c)?)); + cfg.set_secure_mode(Some(process_secure_mode_cfg(c)?)); } else if let Some(secure_mode) = self.secure_mode { if secure_mode { let c = SecureModeConfig { @@ -1014,7 +977,7 @@ impl NetworkOptions { local_private_key: self.local_private_key.clone(), local_public_key: self.local_public_key.clone(), }; - cfg.set_secure_mode(Some(Self::process_secure_mode_cfg(c)?)); + cfg.set_secure_mode(Some(process_secure_mode_cfg(c)?)); } } diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index a86bf37c..052362f8 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -1,8 +1,12 @@ use std::{ + collections::{BTreeMap, HashMap}, ffi::OsString, + future::Future, net::{IpAddr, SocketAddr}, path::PathBuf, + pin::Pin, str::FromStr, + sync::Arc, time::Duration, vec, }; @@ -29,6 +33,7 @@ use easytier::{ }, peers, proto::{ + acl::AclStats, api::{ config::{ AclPatch, ConfigPatchAction, ConfigRpc, ConfigRpcClientFactory, @@ -36,24 +41,32 @@ use easytier::{ }, instance::{ instance_identifier::{InstanceSelector, Selector}, - list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, ConnectorManageRpc, + list_global_foreign_network_response, list_peer_route_pair, AclManageRpc, + AclManageRpcClientFactory, Connector, ConnectorManageRpc, ConnectorManageRpcClientFactory, CredentialManageRpc, - CredentialManageRpcClientFactory, DumpRouteRequest, GenerateCredentialRequest, - GetAclStatsRequest, GetPrometheusStatsRequest, GetStatsRequest, - GetVpnPortalInfoRequest, GetWhitelistRequest, InstanceIdentifier, - ListConnectorRequest, ListCredentialsRequest, ListForeignNetworkRequest, + CredentialManageRpcClientFactory, DumpRouteRequest, ForeignNetworkEntryPb, + GenerateCredentialRequest, GetAclStatsRequest, GetPrometheusStatsRequest, + GetStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, + GetWhitelistResponse, InstanceIdentifier, ListConnectorRequest, + ListCredentialsRequest, ListCredentialsResponse, ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest, ListPeerRequest, - ListPeerResponse, ListPortForwardRequest, ListRouteRequest, ListRouteResponse, - MappedListenerManageRpc, MappedListenerManageRpcClientFactory, NodeInfo, - PeerManageRpc, PeerManageRpcClientFactory, PortForwardManageRpc, + ListPeerResponse, ListPortForwardRequest, ListPortForwardResponse, + ListRouteRequest, ListRouteResponse, MappedListener, MappedListenerManageRpc, + MappedListenerManageRpcClientFactory, MetricSnapshot, NodeInfo, PeerManageRpc, + PeerManageRpcClientFactory, PortForwardManageRpc, PortForwardManageRpcClientFactory, RevokeCredentialRequest, ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory, TcpProxyEntryState, TcpProxyEntryTransportType, - TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc, VpnPortalRpcClientFactory, + TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalInfo, VpnPortalRpc, + VpnPortalRpcClientFactory, }, logger::{ GetLoggerConfigRequest, LogLevel, LoggerRpc, LoggerRpcClientFactory, SetLoggerConfigRequest, }, + manage::{ + ListNetworkInstanceMetaRequest, ListNetworkInstanceRequest, WebClientService, + WebClientServiceClientFactory, + }, }, common::{NatType, PortForwardConfigPb, SocketType}, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, @@ -437,17 +450,251 @@ struct InstallArgs { type Error = anyhow::Error; +#[derive(Clone, Debug)] +struct InstanceTarget { + identifier: InstanceIdentifier, + instance_id: String, + instance_name: String, +} + +struct InstanceResult { + target: Option, + value: T, +} + +impl InstanceTarget { + fn label(&self) -> String { + match (self.instance_name.is_empty(), self.instance_id.is_empty()) { + (false, false) => format!("{} ({})", self.instance_name, self.instance_id), + (false, true) => self.instance_name.clone(), + (true, false) => self.instance_id.clone(), + (true, true) => "selected instance".to_string(), + } + } +} + +impl InstanceResult { + fn new(target: Option, value: T) -> Self { + Self { target, value } + } + + fn map(self, f: impl FnOnce(T) -> U) -> InstanceResult { + InstanceResult { + target: self.target, + value: f(self.value), + } + } +} + struct CommandHandler<'a> { - client: tokio::sync::Mutex, + client: Arc>, verbose: bool, output_format: &'a OutputFormat, no_trunc: bool, + instance_select: &'a InstanceSelectArgs, instance_selector: InstanceIdentifier, + resolved_target: Option, } type RpcClient = StandAloneClient; +type LocalBoxFuture<'a, T> = Pin> + 'a>>; +type ForeignNetworkMap = BTreeMap; +type GlobalForeignNetworkMap = BTreeMap; + +#[derive(serde::Serialize)] +struct PeerListData { + node_info: NodeInfo, + peer_routes: Vec, +} + +#[derive(serde::Serialize)] +struct RouteListData { + node_info: NodeInfo, + peer_routes: Vec, +} + +#[derive(serde::Serialize)] +struct PeerCenterRowData { + node_id: String, + hostname: String, + ipv4: String, + direct_peers: Vec, +} + +#[derive(serde::Serialize)] +struct PeerCenterDirectPeerData { + node_id: String, + hostname: String, + ipv4: String, + latency_ms: i32, +} + +impl<'a> CommandHandler<'a> { + fn has_explicit_instance_selector(&self) -> bool { + self.instance_select.id.is_some() || self.instance_select.name.is_some() + } + + fn scoped_to_instance(&self, target: &InstanceTarget) -> Self { + Self { + client: self.client.clone(), + verbose: self.verbose, + output_format: self.output_format, + no_trunc: self.no_trunc, + instance_select: self.instance_select, + instance_selector: target.identifier.clone(), + resolved_target: Some(target.clone()), + } + } + + fn print_target_header(&self, target: &InstanceTarget) { + println!("== {} ==", target.label()); + } + + async fn get_manage_client( + &self, + ) -> Result>, Error> { + Ok(self + .client + .lock() + .await + .scoped_client::>("".to_string()) + .await + .with_context(|| "failed to get manage client")?) + } + + async fn fanout_targets(&self) -> Result>, Error> { + if self.resolved_target.is_some() || self.has_explicit_instance_selector() { + return Ok(None); + } + + let client = self.get_manage_client().await?; + let inst_ids = client + .list_network_instance(BaseController::default(), ListNetworkInstanceRequest {}) + .await? + .inst_ids + .into_iter() + .map(uuid::Uuid::from) + .collect::>(); + + if inst_ids.is_empty() { + return Err(anyhow::anyhow!("no running instances found")); + } + + let metas = client + .list_network_instance_meta( + BaseController::default(), + ListNetworkInstanceMetaRequest { + inst_ids: inst_ids.iter().cloned().map(Into::into).collect(), + }, + ) + .await? + .metas; + + let mut name_map = HashMap::new(); + for meta in metas { + if let Some(inst_id) = meta.inst_id { + name_map.insert( + uuid::Uuid::from(inst_id), + if meta.instance_name.is_empty() { + meta.network_name + } else { + meta.instance_name + }, + ); + } + } + + let mut targets = inst_ids + .into_iter() + .map(|inst_id| InstanceTarget { + identifier: InstanceIdentifier { + selector: Some(Selector::Id(inst_id.into())), + }, + instance_id: inst_id.to_string(), + instance_name: name_map.remove(&inst_id).unwrap_or_default(), + }) + .collect::>(); + + targets.sort_by_key(|a| a.label()); + Ok(Some(targets)) + } + + async fn collect_instance_results( + &self, + fetch: F, + ) -> Result>, Error> + where + F: for<'b> Fn(&'b CommandHandler<'a>) -> LocalBoxFuture<'b, T>, + { + if let Some(targets) = self.fanout_targets().await? { + let mut results = Vec::with_capacity(targets.len()); + for target in targets { + let scoped = self.scoped_to_instance(&target); + let value = fetch(&scoped) + .await + .with_context(|| format!("instance {}", target.label()))?; + results.push(InstanceResult::new(Some(target), value)); + } + Ok(results) + } else { + Ok(vec![InstanceResult::new(None, fetch(self).await?)]) + } + } + + async fn apply_to_instances(&self, apply: F) -> Result<(), Error> + where + F: for<'b> Fn(&'b CommandHandler<'a>) -> LocalBoxFuture<'b, ()>, + { + self.collect_instance_results(apply).await?; + Ok(()) + } + + fn print_results( + &self, + results: &[InstanceResult], + mut render: impl FnMut(&T) -> Result<(), Error>, + ) -> Result<(), Error> { + let multi = results.len() > 1; + for (idx, result) in results.iter().enumerate() { + if multi { + if idx > 0 { + println!(); + } + if let Some(target) = result.target.as_ref() { + self.print_target_header(target); + } + } + render(&result.value)?; + } + Ok(()) + } + + fn print_json_results( + &self, + results: Vec>, + ) -> Result<(), Error> { + if results.len() == 1 { + println!("{}", serde_json::to_string_pretty(&results[0].value)?); + return Ok(()); + } + + let wrapped = results + .into_iter() + .map(|result| { + let target = result + .target + .ok_or_else(|| anyhow::anyhow!("missing instance target for multi-result"))?; + Ok(serde_json::json!({ + "instance_id": target.instance_id, + "instance_name": target.instance_name, + "result": result.value, + })) + }) + .collect::, Error>>()?; + println!("{}", serde_json::to_string_pretty(&wrapped)?); + Ok(()) + } -impl CommandHandler<'_> { async fn get_peer_manager_client( &self, ) -> Result>, Error> { @@ -619,6 +866,279 @@ impl CommandHandler<'_> { Ok(list_peer_route_pair(peers, routes)) } + async fn fetch_node_info(&self) -> Result { + self.get_peer_manager_client() + .await? + .show_node_info( + BaseController::default(), + ShowNodeInfoRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .node_info + .ok_or(anyhow::anyhow!("node info not found")) + } + + async fn fetch_peer_list_data(&self) -> Result { + Ok(PeerListData { + node_info: self.fetch_node_info().await?, + peer_routes: self.list_peer_route_pair().await?, + }) + } + + async fn fetch_route_dump(&self) -> Result { + Ok(self + .get_peer_manager_client() + .await? + .dump_route( + BaseController::default(), + DumpRouteRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .result) + } + + async fn fetch_foreign_networks(&self) -> Result { + Ok(self + .get_peer_manager_client() + .await? + .list_foreign_network( + BaseController::default(), + ListForeignNetworkRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .foreign_networks) + } + + async fn fetch_global_foreign_networks(&self) -> Result { + Ok(self + .get_peer_manager_client() + .await? + .list_global_foreign_network( + BaseController::default(), + ListGlobalForeignNetworkRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .foreign_networks) + } + + async fn fetch_route_list_data(&self) -> Result { + Ok(RouteListData { + node_info: self.fetch_node_info().await?, + peer_routes: self.list_peer_route_pair().await?, + }) + } + + async fn fetch_connector_list(&self) -> Result, Error> { + Ok(self + .get_connector_manager_client() + .await? + .list_connector( + BaseController::default(), + ListConnectorRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .connectors) + } + + async fn fetch_acl_stats(&self) -> Result, Error> { + Ok(self + .get_acl_manager_client() + .await? + .get_acl_stats( + BaseController::default(), + GetAclStatsRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .acl_stats) + } + + async fn fetch_mapped_listener_list(&self) -> Result, Error> { + Ok(self + .get_mapped_listener_manager_client() + .await? + .list_mapped_listener( + BaseController::default(), + ListMappedListenerRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .mappedlisteners) + } + + async fn fetch_port_forward_list(&self) -> Result { + Ok(self + .get_port_forward_manager_client() + .await? + .list_port_forward( + BaseController::default(), + ListPortForwardRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await?) + } + + async fn fetch_whitelist(&self) -> Result { + Ok(self + .get_acl_manager_client() + .await? + .get_whitelist( + BaseController::default(), + GetWhitelistRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await?) + } + + async fn fetch_credential_list(&self) -> Result { + Ok(self + .get_credential_client() + .await? + .list_credentials( + BaseController::default(), + ListCredentialsRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await?) + } + + async fn fetch_peer_center_rows(&self) -> Result, Error> { + struct PeerCenterNodeInfo { + hostname: String, + ipv4: String, + } + + let resp = self + .get_peer_center_client() + .await? + .get_global_peer_map( + BaseController::default(), + GetGlobalPeerMapRequest::default(), + ) + .await?; + let route_infos = self.list_peer_route_pair().await?; + let node_id_to_node_info = DashMap::new(); + let node_info = self.fetch_node_info().await?; + node_id_to_node_info.insert( + node_info.peer_id, + PeerCenterNodeInfo { + hostname: node_info.hostname.clone(), + ipv4: node_info.ipv4_addr, + }, + ); + for route_info in route_infos { + let Some(peer_id) = route_info.route.as_ref().map(|x| x.peer_id) else { + continue; + }; + node_id_to_node_info.insert( + peer_id, + PeerCenterNodeInfo { + hostname: route_info + .route + .as_ref() + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: route_info + .route + .as_ref() + .and_then(|x| x.ipv4_addr) + .map(|x| x.to_string()) + .unwrap_or_default(), + }, + ); + } + + Ok(resp + .global_peer_map + .iter() + .map(|(node_id, directs)| PeerCenterRowData { + node_id: node_id.to_string(), + hostname: node_id_to_node_info + .get(node_id) + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: node_id_to_node_info + .get(node_id) + .map(|x| x.ipv4.clone()) + .unwrap_or_default(), + direct_peers: directs + .direct_peers + .iter() + .map(|(k, v)| PeerCenterDirectPeerData { + node_id: k.to_string(), + hostname: node_id_to_node_info + .get(k) + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: node_id_to_node_info + .get(k) + .map(|x| x.ipv4.clone()) + .unwrap_or_default(), + latency_ms: v.latency_ms, + }) + .collect(), + }) + .collect()) + } + + async fn fetch_vpn_portal_info(&self) -> Result { + Ok(self + .get_vpn_portal_client() + .await? + .get_vpn_portal_info( + BaseController::default(), + GetVpnPortalInfoRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .vpn_portal_info + .unwrap_or_default()) + } + + async fn fetch_stats(&self) -> Result, Error> { + Ok(self + .get_stats_client() + .await? + .get_stats( + BaseController::default(), + GetStatsRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .metrics) + } + + async fn fetch_prometheus_stats(&self) -> Result { + Ok(self + .get_stats_client() + .await? + .get_prometheus_stats( + BaseController::default(), + GetPrometheusStatsRequest { + instance: Some(self.instance_selector.clone()), + }, + ) + .await? + .prometheus_text) + } + #[allow(dead_code)] fn handle_peer_add(&self, _args: PeerArgs) { println!("add peer"); @@ -713,161 +1233,149 @@ impl CommandHandler<'_> { } } - let mut items: Vec = vec![]; - let peer_routes = self.list_peer_route_pair().await?; + let build_items = |data: &PeerListData| { + let mut items = Vec::with_capacity(data.peer_routes.len() + 1); + items.push(PeerTableItem::from(data.node_info.clone())); + items.extend(data.peer_routes.iter().cloned().map(Into::into)); + items.sort_by(|a, b| { + use std::net::{IpAddr, Ipv4Addr}; + + let a_is_local = a.cost == "Local"; + let b_is_local = b.cost == "Local"; + if a_is_local != b_is_local { + return if a_is_local { + std::cmp::Ordering::Less + } else { + std::cmp::Ordering::Greater + }; + } + + let a_is_public = a.hostname.starts_with(peers::PUBLIC_SERVER_HOSTNAME_PREFIX); + let b_is_public = b.hostname.starts_with(peers::PUBLIC_SERVER_HOSTNAME_PREFIX); + if a_is_public != b_is_public { + return if a_is_public { + std::cmp::Ordering::Less + } else { + std::cmp::Ordering::Greater + }; + } + + let a_ip = IpAddr::from_str(&a.ipv4).unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); + let b_ip = IpAddr::from_str(&b.ipv4).unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); + match a_ip.cmp(&b_ip) { + std::cmp::Ordering::Equal => a.hostname.cmp(&b.hostname), + other => other, + } + }); + items + }; + + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_peer_list_data())) + .await?; + if self.verbose { - println!("{}", serde_json::to_string_pretty(&peer_routes)?); - return Ok(()); + return self.print_json_results( + results + .into_iter() + .map(|result| result.map(|data| data.peer_routes)) + .collect(), + ); + } + if *self.output_format == OutputFormat::Json { + return self.print_json_results( + results + .into_iter() + .map(|result| result.map(|data| build_items(&data))) + .collect(), + ); } - let client = self.get_peer_manager_client().await?; - let node_info = client - .show_node_info( - BaseController::default(), - ShowNodeInfoRequest { - instance: Some(self.instance_selector.clone()), - }, + self.print_results(&results, |data| { + let items = build_items(data); + print_output( + &items, + self.output_format, + &["tunnel", "version"], + &["version", "tunnel", "nat", "tx", "rx", "loss", "lat(ms)"], + self.no_trunc, ) - .await? - .node_info - .ok_or(anyhow::anyhow!("node info not found"))?; - items.push(node_info.into()); - - for p in peer_routes { - items.push(p.into()); - } - - // Sort items: local IP first, then public servers, then other servers by IP - items.sort_by(|a, b| { - use std::net::{IpAddr, Ipv4Addr}; - use std::str::FromStr; - - // Priority 1: Local IP (cost is "Local") - let a_is_local = a.cost == "Local"; - let b_is_local = b.cost == "Local"; - if a_is_local != b_is_local { - return if a_is_local { - std::cmp::Ordering::Less - } else { - std::cmp::Ordering::Greater - }; - } - - // Priority 2: Public servers - let a_is_public = a.hostname.starts_with(peers::PUBLIC_SERVER_HOSTNAME_PREFIX); - let b_is_public = b.hostname.starts_with(peers::PUBLIC_SERVER_HOSTNAME_PREFIX); - if a_is_public != b_is_public { - return if a_is_public { - std::cmp::Ordering::Less - } else { - std::cmp::Ordering::Greater - }; - } - - // Priority 3: Sort by IP address - let a_ip = IpAddr::from_str(&a.ipv4).unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); - let b_ip = IpAddr::from_str(&b.ipv4).unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)); - match a_ip.cmp(&b_ip) { - std::cmp::Ordering::Equal => a.hostname.cmp(&b.hostname), - other => other, - } - }); - - print_output( - &items, - self.output_format, - &["tunnel", "version"], - &["version", "tunnel", "nat", "tx", "rx", "loss", "lat(ms)"], - self.no_trunc, - )?; - - Ok(()) + }) } async fn handle_route_dump(&self) -> Result<(), Error> { - let client = self.get_peer_manager_client().await?; - let request = DumpRouteRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .dump_route(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_route_dump())) .await?; - println!("response: {}", response.result); - Ok(()) + if self.verbose || *self.output_format == OutputFormat::Json { + return self.print_json_results(results); + } + self.print_results(&results, |result| { + println!("response: {}", result); + Ok(()) + }) } async fn handle_foreign_network_list(&self) -> Result<(), Error> { - let client = self.get_peer_manager_client().await?; - let request = ListForeignNetworkRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .list_foreign_network(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_foreign_networks())) .await?; - let network_map = response; if self.verbose || *self.output_format == OutputFormat::Json { - let json = serde_json::to_string_pretty(&network_map.foreign_networks)?; - println!("{}", json); - return Ok(()); + return self.print_json_results(results); } - for (idx, (k, v)) in network_map.foreign_networks.iter().enumerate() { - println!("{} Network Name: {}", idx + 1, k); - for peer in v.peers.iter() { - println!( - " peer_id: {}, peer_conn_count: {}, conns: [ {} ]", - peer.peer_id, - peer.conns.len(), - peer.conns - .iter() - .map(|conn| format!( - "remote_addr: {}, rx_bytes: {}, tx_bytes: {}, latency_us: {}", - conn.tunnel - .as_ref() - .map(|t| t.remote_addr.clone().unwrap_or_default()) - .unwrap_or_default(), - conn.stats.as_ref().map(|s| s.rx_bytes).unwrap_or_default(), - conn.stats.as_ref().map(|s| s.tx_bytes).unwrap_or_default(), - conn.stats - .as_ref() - .map(|s| s.latency_us) - .unwrap_or_default(), - )) - .collect::>() - .join("; ") - ); + self.print_results(&results, |networks| { + for (idx, (k, v)) in networks.iter().enumerate() { + println!("{} Network Name: {}", idx + 1, k); + for peer in v.peers.iter() { + println!( + " peer_id: {}, peer_conn_count: {}, conns: [ {} ]", + peer.peer_id, + peer.conns.len(), + peer.conns + .iter() + .map(|conn| format!( + "remote_addr: {}, rx_bytes: {}, tx_bytes: {}, latency_us: {}", + conn.tunnel + .as_ref() + .map(|t| t.remote_addr.clone().unwrap_or_default()) + .unwrap_or_default(), + conn.stats.as_ref().map(|s| s.rx_bytes).unwrap_or_default(), + conn.stats.as_ref().map(|s| s.tx_bytes).unwrap_or_default(), + conn.stats + .as_ref() + .map(|s| s.latency_us) + .unwrap_or_default(), + )) + .collect::>() + .join("; ") + ); + } } - } - Ok(()) + Ok(()) + }) } async fn handle_global_foreign_network_list(&self) -> Result<(), Error> { - let client = self.get_peer_manager_client().await?; - let request = ListGlobalForeignNetworkRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .list_global_foreign_network(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_global_foreign_networks())) .await?; if self.verbose || *self.output_format == OutputFormat::Json { - println!( - "{}", - serde_json::to_string_pretty(&response.foreign_networks)? - ); - return Ok(()); + return self.print_json_results(results); } - for (k, v) in response.foreign_networks.iter() { - println!("Peer ID: {}", k); - for n in v.foreign_networks.iter() { - println!( - " Network Name: {}, Last Updated: {}, Version: {}, PeerIds: {:?}", - n.network_name, n.last_updated, n.version, n.peer_ids - ); + self.print_results(&results, |networks| { + for (k, v) in networks.iter() { + println!("Peer ID: {}", k); + for n in v.foreign_networks.iter() { + println!( + " Network Name: {}, Last Updated: {}, Version: {}, PeerIds: {:?}", + n.network_name, n.last_updated, n.version, n.peer_ids + ); + } } - } - - Ok(()) + Ok(()) + }) } async fn handle_route_list(&self) -> Result<(), Error> { @@ -891,198 +1399,168 @@ impl CommandHandler<'_> { version: String, } - let mut items: Vec = vec![]; - let client = self.get_peer_manager_client().await?; - let node_info = client - .show_node_info( - BaseController::default(), - ShowNodeInfoRequest { - instance: Some(self.instance_selector.clone()), - }, - ) - .await? - .node_info - .ok_or(anyhow::anyhow!("node info not found"))?; - let peer_routes = self.list_peer_route_pair().await?; + let build_items = |data: &RouteListData| { + let mut items = vec![RouteTableItem { + ipv4: data.node_info.ipv4_addr.clone(), + hostname: data.node_info.hostname.clone(), + proxy_cidrs: data.node_info.proxy_cidrs.join(", "), + next_hop_ipv4: "-".to_string(), + next_hop_hostname: "Local".to_string(), + next_hop_lat: 0.0, + path_len: 0, + path_latency: 0, + next_hop_ipv4_lat_first: "-".to_string(), + next_hop_hostname_lat_first: "Local".to_string(), + path_len_lat_first: 0, + path_latency_lat_first: 0, + version: data.node_info.version.clone(), + }]; + + for p in data.peer_routes.iter() { + let Some(next_hop_pair) = data.peer_routes.iter().find(|pair| { + pair.route.clone().unwrap_or_default().peer_id + == p.route.clone().unwrap_or_default().next_hop_peer_id + }) else { + continue; + }; + + let next_hop_pair_latency_first = data.peer_routes.iter().find(|pair| { + pair.route.clone().unwrap_or_default().peer_id + == p.route + .clone() + .unwrap_or_default() + .next_hop_peer_id_latency_first + .unwrap_or_default() + }); + + let route = p.route.clone().unwrap_or_default(); + items.push(RouteTableItem { + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(","), + next_hop_ipv4: if route.cost == 1 { + "DIRECT".to_string() + } else { + next_hop_pair + .route + .clone() + .unwrap_or_default() + .ipv4_addr + .map(|ip| ip.to_string()) + .unwrap_or_default() + }, + next_hop_hostname: if route.cost == 1 { + "DIRECT".to_string() + } else { + next_hop_pair.route.clone().unwrap_or_default().hostname + }, + next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), + path_len: route.cost, + path_latency: route.path_latency, + next_hop_ipv4_lat_first: if route.cost_latency_first.unwrap_or_default() == 1 { + "DIRECT".to_string() + } else { + next_hop_pair_latency_first + .map(|pair| pair.route.clone().unwrap_or_default().ipv4_addr) + .unwrap_or_default() + .map(|ip| ip.to_string()) + .unwrap_or_default() + }, + next_hop_hostname_lat_first: if route.cost_latency_first.unwrap_or_default() + == 1 + { + "DIRECT".to_string() + } else { + next_hop_pair_latency_first + .map(|pair| pair.route.clone().unwrap_or_default().hostname) + .unwrap_or_default() + }, + path_latency_lat_first: route.path_latency_latency_first.unwrap_or_default(), + path_len_lat_first: route.cost_latency_first.unwrap_or_default(), + version: if route.version.is_empty() { + "unknown".to_string() + } else { + route.version + }, + }); + } + + items + }; + + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_route_list_data())) + .await?; if self.verbose { - #[derive(serde::Serialize)] - struct VerboseItem { - node_info: NodeInfo, - peer_routes: Vec, - } - println!( - "{}", - serde_json::to_string_pretty(&VerboseItem { - node_info, - peer_routes - })? + return self.print_json_results(results); + } + if *self.output_format == OutputFormat::Json { + return self.print_json_results( + results + .into_iter() + .map(|result| result.map(|data| build_items(&data))) + .collect(), ); - return Ok(()); } - items.push(RouteTableItem { - ipv4: node_info.ipv4_addr.clone(), - hostname: node_info.hostname.clone(), - proxy_cidrs: node_info.proxy_cidrs.join(", "), - - next_hop_ipv4: "-".to_string(), - next_hop_hostname: "Local".to_string(), - next_hop_lat: 0.0, - path_len: 0, - path_latency: 0, - - next_hop_ipv4_lat_first: "-".to_string(), - next_hop_hostname_lat_first: "Local".to_string(), - path_len_lat_first: 0, - path_latency_lat_first: 0, - - version: node_info.version.clone(), - }); - for p in peer_routes.iter() { - let Some(next_hop_pair) = peer_routes.iter().find(|pair| { - pair.route.clone().unwrap_or_default().peer_id - == p.route.clone().unwrap_or_default().next_hop_peer_id - }) else { - continue; - }; - - let next_hop_pair_latency_first = peer_routes.iter().find(|pair| { - pair.route.clone().unwrap_or_default().peer_id - == p.route - .clone() - .unwrap_or_default() - .next_hop_peer_id_latency_first - .unwrap_or_default() - }); - - let route = p.route.clone().unwrap_or_default(); - items.push(RouteTableItem { - ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), - hostname: route.hostname.clone(), - proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), - next_hop_ipv4: if route.cost == 1 { - "DIRECT".to_string() - } else { - next_hop_pair - .route - .clone() - .unwrap_or_default() - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default() - }, - next_hop_hostname: if route.cost == 1 { - "DIRECT".to_string() - } else { - next_hop_pair - .route - .clone() - .unwrap_or_default() - .hostname - .clone() - }, - next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - path_len: route.cost, - path_latency: route.path_latency, - - next_hop_ipv4_lat_first: if route.cost_latency_first.unwrap_or_default() == 1 { - "DIRECT".to_string() - } else { - next_hop_pair_latency_first - .map(|pair| pair.route.clone().unwrap_or_default().ipv4_addr) - .unwrap_or_default() - .map(|ip| ip.to_string()) - .unwrap_or_default() - }, - next_hop_hostname_lat_first: if route.cost_latency_first.unwrap_or_default() == 1 { - "DIRECT".to_string() - } else { - next_hop_pair_latency_first - .map(|pair| pair.route.clone().unwrap_or_default().hostname) - .unwrap_or_default() - .clone() - }, - path_latency_lat_first: route.path_latency_latency_first.unwrap_or_default(), - path_len_lat_first: route.cost_latency_first.unwrap_or_default(), - - version: if route.version.is_empty() { - "unknown".to_string() - } else { - route.version.to_string() - }, - }); - } - - print_output( - &items, - self.output_format, - &["proxy_cidrs", "version"], - &["proxy_cidrs", "version"], - self.no_trunc, - )?; - - Ok(()) + self.print_results(&results, |data| { + let items = build_items(data); + print_output( + &items, + self.output_format, + &["proxy_cidrs", "version"], + &["proxy_cidrs", "version"], + self.no_trunc, + ) + }) } async fn handle_connector_list(&self) -> Result<(), Error> { - let client = self.get_connector_manager_client().await?; - let request = ListConnectorRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .list_connector(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_connector_list())) .await?; if self.verbose || *self.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&response.connectors)?); - return Ok(()); + return self.print_json_results(results); } - println!("response: {:#?}", response); - Ok(()) + self.print_results(&results, |connectors| { + println!("response: {:#?}", connectors); + Ok(()) + }) } async fn handle_acl_stats(&self) -> Result<(), Error> { - let client = self.get_acl_manager_client().await?; - let request = GetAclStatsRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .get_acl_stats(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_acl_stats())) .await?; - - if let Some(acl_stats) = response.acl_stats { - if self.output_format == &OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&acl_stats)?); - } else { - println!("{}", acl_stats); - } - } else { - println!("No ACL statistics available"); + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); } - Ok(()) + self.print_results(&results, |acl_stats| { + if let Some(acl_stats) = acl_stats { + println!("{}", acl_stats); + } else { + println!("No ACL statistics available"); + } + Ok(()) + }) } async fn handle_mapped_listener_list(&self) -> Result<(), Error> { - let client = self.get_mapped_listener_manager_client().await?; - let request = ListMappedListenerRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .list_mapped_listener(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_mapped_listener_list())) .await?; if self.verbose || *self.output_format == OutputFormat::Json { - println!( - "{}", - serde_json::to_string_pretty(&response.mappedlisteners)? - ); - return Ok(()); + return self.print_json_results(results); } - println!("response: {:#?}", response); - Ok(()) + self.print_results(&results, |listeners| { + println!("response: {:#?}", listeners); + Ok(()) + }) } - async fn handle_mapped_listener_modify( + async fn apply_mapped_listener_modify( &self, url: &str, action: ConfigPatchAction, @@ -1105,6 +1583,19 @@ impl CommandHandler<'_> { Ok(()) } + async fn handle_mapped_listener_modify( + &self, + url: &str, + action: ConfigPatchAction, + ) -> Result<(), Error> { + let url = url.to_string(); + self.apply_to_instances(|handler| { + let url = url.clone(); + Box::pin(async move { handler.apply_mapped_listener_modify(&url, action).await }) + }) + .await + } + fn mapped_listener_validate_url(url: &str) -> Result { let url = url::Url::parse(url)?; if url.scheme() != "tcp" && url.scheme() != "udp" { @@ -1117,7 +1608,7 @@ impl CommandHandler<'_> { Ok(url) } - async fn handle_port_forward_modify( + async fn apply_port_forward_modify( &self, action: ConfigPatchAction, protocol: &str, @@ -1162,18 +1653,35 @@ impl CommandHandler<'_> { Ok(()) } - async fn handle_port_forward_list(&self) -> Result<(), Error> { - let client = self.get_port_forward_manager_client().await?; - let request = ListPortForwardRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .list_port_forward(BaseController::default(), request) - .await?; + async fn handle_port_forward_modify( + &self, + action: ConfigPatchAction, + protocol: &str, + bind_addr: &str, + dst_addr: Option<&str>, + ) -> Result<(), Error> { + let protocol = protocol.to_string(); + let bind_addr = bind_addr.to_string(); + let dst_addr = dst_addr.map(str::to_string); + self.apply_to_instances(|handler| { + let protocol = protocol.clone(); + let bind_addr = bind_addr.clone(); + let dst_addr = dst_addr.clone(); + Box::pin(async move { + handler + .apply_port_forward_modify(action, &protocol, &bind_addr, dst_addr.as_deref()) + .await + }) + }) + .await + } + async fn handle_port_forward_list(&self) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_port_forward_list())) + .await?; if self.verbose || *self.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&response)?); - return Ok(()); + return self.print_json_results(results); } #[derive(tabled::Tabled, serde::Serialize)] @@ -1183,30 +1691,32 @@ impl CommandHandler<'_> { dst_addr: String, } - let items: Vec = response - .cfgs - .into_iter() - .map(|rule| PortForwardTableItem { - protocol: format!( - "{:?}", - SocketType::try_from(rule.socket_type).unwrap_or(SocketType::Tcp) - ), - bind_addr: rule - .bind_addr - .map(|addr| addr.to_string()) - .unwrap_or_default(), - dst_addr: rule - .dst_addr - .map(|addr| addr.to_string()) - .unwrap_or_default(), - }) - .collect(); + self.print_results(&results, |response| { + let items: Vec = response + .cfgs + .iter() + .cloned() + .map(|rule| PortForwardTableItem { + protocol: format!( + "{:?}", + SocketType::try_from(rule.socket_type).unwrap_or(SocketType::Tcp) + ), + bind_addr: rule + .bind_addr + .map(|addr| addr.to_string()) + .unwrap_or_default(), + dst_addr: rule + .dst_addr + .map(|addr| addr.to_string()) + .unwrap_or_default(), + }) + .collect(); - print_output(&items, self.output_format, &[], &[], self.no_trunc)?; - Ok(()) + print_output(&items, self.output_format, &[], &[], self.no_trunc) + }) } - async fn handle_whitelist_set_tcp(&self, ports: &str) -> Result<(), Error> { + async fn apply_whitelist_set(&self, ports: &str, is_tcp: bool) -> Result<(), Error> { let mut whitelist = Self::parse_port_list(ports)? .into_iter() .map(|p| StringPatch { @@ -1227,7 +1737,8 @@ impl CommandHandler<'_> { instance: Some(self.instance_selector.clone()), patch: Some(InstanceConfigPatch { acl: Some(AclPatch { - tcp_whitelist: whitelist, + tcp_whitelist: if is_tcp { whitelist.clone() } else { vec![] }, + udp_whitelist: if is_tcp { vec![] } else { whitelist }, ..Default::default() }), ..Default::default() @@ -1237,32 +1748,54 @@ impl CommandHandler<'_> { client .patch_config(BaseController::default(), request) .await?; + Ok(()) + } + + async fn handle_whitelist_set_tcp(&self, ports: &str) -> Result<(), Error> { + let ports = ports.to_string(); + self.apply_to_instances(|handler| { + let ports = ports.clone(); + Box::pin(async move { handler.apply_whitelist_set(&ports, true).await }) + }) + .await?; println!("TCP whitelist updated: {}", ports); Ok(()) } async fn handle_whitelist_set_udp(&self, ports: &str) -> Result<(), Error> { - let mut whitelist = Self::parse_port_list(ports)? - .into_iter() - .map(|p| StringPatch { - action: ConfigPatchAction::Add.into(), - value: p, - }) - .collect::>(); - whitelist.insert( - 0, - StringPatch { - action: ConfigPatchAction::Clear.into(), - value: "".to_string(), - }, - ); + let ports = ports.to_string(); + self.apply_to_instances(|handler| { + let ports = ports.clone(); + Box::pin(async move { handler.apply_whitelist_set(&ports, false).await }) + }) + .await?; + println!("UDP whitelist updated: {}", ports); + Ok(()) + } + + async fn apply_whitelist_clear(&self, is_tcp: bool) -> Result<(), Error> { let client = self.get_config_client().await?; let request = PatchConfigRequest { instance: Some(self.instance_selector.clone()), patch: Some(InstanceConfigPatch { acl: Some(AclPatch { - udp_whitelist: whitelist, + tcp_whitelist: if is_tcp { + vec![StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }] + } else { + vec![] + }, + udp_whitelist: if is_tcp { + vec![] + } else { + vec![StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }] + }, ..Default::default() }), ..Default::default() @@ -1272,91 +1805,51 @@ impl CommandHandler<'_> { client .patch_config(BaseController::default(), request) .await?; - println!("UDP whitelist updated: {}", ports); Ok(()) } async fn handle_whitelist_clear_tcp(&self) -> Result<(), Error> { - let client = self.get_config_client().await?; - - let request = PatchConfigRequest { - instance: Some(self.instance_selector.clone()), - patch: Some(InstanceConfigPatch { - acl: Some(AclPatch { - tcp_whitelist: vec![StringPatch { - action: ConfigPatchAction::Clear.into(), - value: "".to_string(), - }], - ..Default::default() - }), - ..Default::default() - }), - }; - - client - .patch_config(BaseController::default(), request) + self.apply_to_instances(|handler| Box::pin(handler.apply_whitelist_clear(true))) .await?; println!("TCP whitelist cleared"); Ok(()) } async fn handle_whitelist_clear_udp(&self) -> Result<(), Error> { - let client = self.get_config_client().await?; - - let request = PatchConfigRequest { - instance: Some(self.instance_selector.clone()), - patch: Some(InstanceConfigPatch { - acl: Some(AclPatch { - udp_whitelist: vec![StringPatch { - action: ConfigPatchAction::Clear.into(), - value: "".to_string(), - }], - ..Default::default() - }), - ..Default::default() - }), - }; - - client - .patch_config(BaseController::default(), request) + self.apply_to_instances(|handler| Box::pin(handler.apply_whitelist_clear(false))) .await?; println!("UDP whitelist cleared"); Ok(()) } async fn handle_whitelist_show(&self) -> Result<(), Error> { - let client = self.get_acl_manager_client().await?; - let request = GetWhitelistRequest { - instance: Some(self.instance_selector.clone()), - }; - let response = client - .get_whitelist(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_whitelist())) .await?; - if self.verbose || *self.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&response)?); - return Ok(()); + return self.print_json_results(results); } - println!( - "TCP Whitelist: {}", - if response.tcp_ports.is_empty() { - "None".to_string() - } else { - response.tcp_ports.join(", ") - } - ); + self.print_results(&results, |response| { + println!( + "TCP Whitelist: {}", + if response.tcp_ports.is_empty() { + "None".to_string() + } else { + response.tcp_ports.join(", ") + } + ); - println!( - "UDP Whitelist: {}", - if response.udp_ports.is_empty() { - "None".to_string() - } else { - response.udp_ports.join(", ") - } - ); - - Ok(()) + println!( + "UDP Whitelist: {}", + if response.udp_ports.is_empty() { + "None".to_string() + } else { + response.udp_ports.join(", ") + } + ); + Ok(()) + }) } async fn handle_logger_get(&self) -> Result<(), Error> { @@ -1427,113 +1920,348 @@ impl CommandHandler<'_> { allow_relay: bool, allowed_proxy_cidrs: Vec, ) -> Result<(), Error> { - let client = self.get_credential_client().await?; - let request = GenerateCredentialRequest { - credential_id, - groups, - allow_relay, - allowed_proxy_cidrs, - ttl_seconds: ttl, - }; - let response = client - .generate_credential(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| { + let credential_id = credential_id.clone(); + let groups = groups.clone(); + let allowed_proxy_cidrs = allowed_proxy_cidrs.clone(); + Box::pin(async move { + handler + .get_credential_client() + .await? + .generate_credential( + BaseController::default(), + GenerateCredentialRequest { + credential_id, + groups, + allow_relay, + allowed_proxy_cidrs, + ttl_seconds: ttl, + instance: Some(handler.instance_selector.clone()), + }, + ) + .await + .map_err(Into::into) + }) + }) .await?; - match self.output_format { - OutputFormat::Table => { - println!("Credential generated successfully:"); - println!(" credential_id: {}", response.credential_id); - println!(" credential_secret: {}", response.credential_secret); - println!(); - println!("To use this credential on a new node:"); - println!( - " easytier-core --network-name --secure-mode --credential {} -p ", - response.credential_secret - ); - } - OutputFormat::Json => { - let json = serde_json::to_string_pretty(&response)?; - println!("{}", json); - } + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); } - Ok(()) + self.print_results(&results, |response| { + println!("Credential generated successfully:"); + println!(" credential_id: {}", response.credential_id); + println!(" credential_secret: {}", response.credential_secret); + println!(); + println!("To use this credential on a new node:"); + println!( + " easytier-core --network-name --secure-mode --credential {} -p ", + response.credential_secret + ); + Ok(()) + }) } async fn handle_credential_revoke(&self, credential_id: &str) -> Result<(), Error> { - let client = self.get_credential_client().await?; - let request = RevokeCredentialRequest { - credential_id: credential_id.to_string(), - }; - let response = client - .revoke_credential(BaseController::default(), request) + let credential_id = credential_id.to_string(); + let results = self + .collect_instance_results(|handler| { + let credential_id = credential_id.clone(); + Box::pin(async move { + handler + .get_credential_client() + .await? + .revoke_credential( + BaseController::default(), + RevokeCredentialRequest { + credential_id, + instance: Some(handler.instance_selector.clone()), + }, + ) + .await + .map_err(Into::into) + }) + }) .await?; - match self.output_format { - OutputFormat::Table => { - if response.success { - println!("Credential revoked successfully"); - } else { - println!("Credential not found"); - } - } - OutputFormat::Json => { - let json = serde_json::to_string_pretty(&response)?; - println!("{}", json); - } + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); } - Ok(()) + self.print_results(&results, |response| { + if response.success { + println!("Credential revoked successfully"); + } else { + println!("Credential not found"); + } + Ok(()) + }) } async fn handle_credential_list(&self) -> Result<(), Error> { - let client = self.get_credential_client().await?; - let request = ListCredentialsRequest {}; - let response = client - .list_credentials(BaseController::default(), request) + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_credential_list())) .await?; - match self.output_format { - OutputFormat::Table => { - if response.credentials.is_empty() { - println!("No active credentials"); - } else { - use tabled::{builder::Builder, settings::Style}; - let mut builder = Builder::default(); - builder.push_record(["ID", "Groups", "Relay", "Expiry", "Allowed CIDRs"]); - for cred in &response.credentials { - let expiry = { - let secs = cred.expiry_unix; - let remaining = secs - - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64; - if remaining > 0 { - format!("{}s remaining", remaining) - } else { - "expired".to_string() - } - }; - builder.push_record([ - &cred.credential_id[..], - &cred.groups.join(","), - if cred.allow_relay { "yes" } else { "no" }, - &expiry, - &cred.allowed_proxy_cidrs.join(","), - ]); - } - let table = builder.build().with(Style::rounded()).to_string(); - println!("{}", table); - } - } - OutputFormat::Json => { - let json = serde_json::to_string_pretty(&response)?; - println!("{}", json); - } + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); } - Ok(()) + self.print_results(&results, |response| { + if response.credentials.is_empty() { + println!("No active credentials"); + } else { + use tabled::{builder::Builder, settings::Style}; + let mut builder = Builder::default(); + builder.push_record(["ID", "Groups", "Relay", "Expiry", "Allowed CIDRs"]); + for cred in &response.credentials { + let expiry = { + let secs = cred.expiry_unix; + let remaining = secs + - std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + if remaining > 0 { + format!("{}s remaining", remaining) + } else { + "expired".to_string() + } + }; + builder.push_record([ + &cred.credential_id[..], + &cred.groups.join(","), + if cred.allow_relay { "yes" } else { "no" }, + &expiry, + &cred.allowed_proxy_cidrs.join(","), + ]); + } + let table = builder.build().with(Style::rounded()).to_string(); + println!("{}", table); + } + Ok(()) + }) + } + + async fn handle_peer_center(&self) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_peer_center_rows())) + .await?; + + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); + } + + #[derive(tabled::Tabled, serde::Serialize)] + struct PeerCenterTableItem { + node_id: String, + hostname: String, + ipv4: String, + #[tabled(rename = "direct_peers")] + direct_peers_str: String, + } + + self.print_results(&results, |rows| { + let table_rows = rows + .iter() + .map(|row| PeerCenterTableItem { + node_id: row.node_id.clone(), + hostname: row.hostname.clone(), + ipv4: row.ipv4.clone(), + direct_peers_str: row + .direct_peers + .iter() + .map(|x| { + format!( + "{}({}[{}]): {}ms", + x.node_id, x.hostname, x.ipv4, x.latency_ms, + ) + }) + .collect::>() + .join("\n"), + }) + .collect::>(); + print_output( + &table_rows, + self.output_format, + &["direct_peers"], + &["direct_peers"], + self.no_trunc, + ) + }) + } + + async fn handle_vpn_portal(&self) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_vpn_portal_info())) + .await?; + + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); + } + + self.print_results(&results, |resp| { + println!("portal_name: {}", resp.vpn_type); + println!( + r#" +############### client_config_start ############### +{} +############### client_config_end ############### +"#, + resp.client_config + ); + println!("connected_clients:\n{:#?}", resp.connected_clients); + Ok(()) + }) + } + + async fn handle_node(&self, sub_command: Option<&NodeSubCommand>) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_node_info())) + .await?; + + if self.verbose || *self.output_format == OutputFormat::Json { + return match sub_command { + Some(NodeSubCommand::Config) => self.print_json_results( + results + .into_iter() + .map(|result| result.map(|node| node.config)) + .collect(), + ), + _ => self.print_json_results(results), + }; + } + + self.print_results(&results, |node_info| match sub_command { + Some(NodeSubCommand::Config) => { + println!("{}", node_info.config); + Ok(()) + } + Some(NodeSubCommand::Info) | None => { + let stun_info = node_info.stun_info.clone().unwrap_or_default(); + let ip_list = node_info.ip_list.clone().unwrap_or_default(); + + let mut builder = tabled::builder::Builder::default(); + builder.push_record(vec!["Virtual IP", node_info.ipv4_addr.as_str()]); + builder.push_record(vec!["Hostname", node_info.hostname.as_str()]); + builder.push_record(vec![ + "Proxy CIDRs", + node_info.proxy_cidrs.join(", ").as_str(), + ]); + builder.push_record(vec!["Peer ID", node_info.peer_id.to_string().as_str()]); + stun_info.public_ip.iter().for_each(|ip| { + let Ok(ip) = ip.parse::() else { + return; + }; + if ip.is_ipv4() { + builder.push_record(vec!["Public IPv4", ip.to_string().as_str()]); + } else { + builder.push_record(vec!["Public IPv6", ip.to_string().as_str()]); + } + }); + builder.push_record(vec![ + "UDP Stun Type", + format!("{:?}", stun_info.udp_nat_type()).as_str(), + ]); + ip_list.interface_ipv4s.iter().for_each(|ip| { + builder.push_record(vec!["Interface IPv4", ip.to_string().as_str()]); + }); + ip_list.interface_ipv6s.iter().for_each(|ip| { + builder.push_record(vec!["Interface IPv6", ip.to_string().as_str()]); + }); + for (idx, l) in node_info.listeners.iter().enumerate() { + if l.starts_with("ring") { + continue; + } + builder.push_record(vec![format!("Listener {}", idx).as_str(), l]); + } + + println!("{}", builder.build().with(Style::markdown())); + Ok(()) + } + }) + } + + async fn handle_stats_show(&self) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_stats())) + .await?; + + if *self.output_format == OutputFormat::Json { + return self.print_json_results(results); + } + + #[derive(tabled::Tabled, serde::Serialize)] + struct StatsTableRow { + #[tabled(rename = "Metric Name")] + name: String, + #[tabled(rename = "Value")] + value: String, + #[tabled(rename = "Labels")] + labels: String, + } + + self.print_results(&results, |metrics| { + let table_rows: Vec = metrics + .iter() + .map(|metric| { + let labels_str = if metric.labels.is_empty() { + "-".to_string() + } else { + metric + .labels + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(", ") + }; + + let formatted_value = if metric.name.contains("bytes") { + format_size(metric.value, humansize::BINARY) + } else if metric.name.contains("duration") { + format!("{} ms", metric.value) + } else { + metric.value.to_string() + }; + + StatsTableRow { + name: metric.name.clone(), + value: formatted_value, + labels: labels_str, + } + }) + .collect(); + + print_output( + &table_rows, + self.output_format, + &["labels"], + &["labels"], + self.no_trunc, + ) + }) + } + + async fn handle_stats_prometheus(&self) -> Result<(), Error> { + let results = self + .collect_instance_results(|handler| Box::pin(handler.fetch_prometheus_stats())) + .await?; + + if *self.output_format == OutputFormat::Json { + return self.print_json_results( + results + .into_iter() + .map(|result| result.map(|text| serde_json::json!({ "prometheus_text": text }))) + .collect(), + ); + } + + self.print_results(&results, |text| { + println!("{}", text); + Ok(()) + }) } fn parse_port_list(ports_str: &str) -> Result, Error> { @@ -1800,11 +2528,13 @@ async fn main() -> Result<(), Error> { .unwrap(), )); let handler = CommandHandler { - client: tokio::sync::Mutex::new(client), + client: Arc::new(tokio::sync::Mutex::new(client)), verbose: cli.verbose, output_format: &cli.output_format, no_trunc: cli.no_trunc, + instance_select: &cli.instance_select, instance_selector: (&cli.instance_select).into(), + resolved_target: None, }; match cli.sub_command { @@ -1890,218 +2620,13 @@ async fn main() -> Result<(), Error> { .unwrap(); } SubCommand::PeerCenter => { - let peer_center_client = handler.get_peer_center_client().await?; - let resp = peer_center_client - .get_global_peer_map( - BaseController::default(), - GetGlobalPeerMapRequest::default(), - ) - .await?; - let route_infos = handler.list_peer_route_pair().await?; - struct PeerCenterNodeInfo { - hostname: String, - ipv4: String, - } - let node_id_to_node_info = DashMap::new(); - let node_info = handler - .get_peer_manager_client() - .await? - .show_node_info( - BaseController::default(), - ShowNodeInfoRequest { - instance: Some((&cli.instance_select).into()), - }, - ) - .await? - .node_info - .ok_or(anyhow::anyhow!("node info not found"))?; - node_id_to_node_info.insert( - node_info.peer_id, - PeerCenterNodeInfo { - hostname: node_info.hostname.clone(), - ipv4: node_info.ipv4_addr, - }, - ); - for route_info in route_infos { - let Some(peer_id) = route_info.route.as_ref().map(|x| x.peer_id) else { - continue; - }; - node_id_to_node_info.insert( - peer_id, - PeerCenterNodeInfo { - hostname: route_info - .route - .as_ref() - .map(|x| x.hostname.clone()) - .unwrap_or_default(), - ipv4: route_info - .route - .as_ref() - .and_then(|x| x.ipv4_addr) - .map(|x| x.to_string()) - .unwrap_or_default(), - }, - ); - } - - #[derive(tabled::Tabled, serde::Serialize)] - struct PeerCenterTableItem { - node_id: String, - hostname: String, - ipv4: String, - #[tabled(rename = "direct_peers")] - #[serde(skip_serializing)] - direct_peers_str: String, - #[tabled(skip)] - direct_peers: Vec, - } - - #[derive(serde::Serialize)] - struct DirectPeerItem { - node_id: String, - hostname: String, - ipv4: String, - latency_ms: i32, - } - - let mut table_rows = vec![]; - for (k, v) in resp.global_peer_map.iter() { - let node_id = k; - let direct_peers: Vec<_> = v - .direct_peers - .iter() - .map(|(k, v)| DirectPeerItem { - node_id: k.to_string(), - hostname: node_id_to_node_info - .get(k) - .map(|x| x.hostname.clone()) - .unwrap_or_default(), - ipv4: node_id_to_node_info - .get(k) - .map(|x| x.ipv4.clone()) - .unwrap_or_default(), - latency_ms: v.latency_ms, - }) - .collect(); - let direct_peers_strs = direct_peers - .iter() - .map(|x| { - format!( - "{}({}[{}]): {}ms", - x.node_id, x.hostname, x.ipv4, x.latency_ms, - ) - }) - .collect::>(); - - table_rows.push(PeerCenterTableItem { - node_id: node_id.to_string(), - hostname: node_id_to_node_info - .get(node_id) - .map(|x| x.hostname.clone()) - .unwrap_or_default(), - ipv4: node_id_to_node_info - .get(node_id) - .map(|x| x.ipv4.clone()) - .unwrap_or_default(), - direct_peers_str: direct_peers_strs.join("\n"), - direct_peers, - }); - } - - print_output( - &table_rows, - &cli.output_format, - &["direct_peers"], - &["direct_peers"], - cli.no_trunc, - )?; + handler.handle_peer_center().await?; } SubCommand::VpnPortal => { - let vpn_portal_client = handler.get_vpn_portal_client().await?; - let resp = vpn_portal_client - .get_vpn_portal_info( - BaseController::default(), - GetVpnPortalInfoRequest { - instance: Some((&cli.instance_select).into()), - }, - ) - .await? - .vpn_portal_info - .unwrap_or_default(); - println!("portal_name: {}", resp.vpn_type); - println!( - r#" -############### client_config_start ############### -{} -############### client_config_end ############### -"#, - resp.client_config - ); - println!("connected_clients:\n{:#?}", resp.connected_clients); + handler.handle_vpn_portal().await?; } SubCommand::Node(sub_cmd) => { - let client = handler.get_peer_manager_client().await?; - let node_info = client - .show_node_info( - BaseController::default(), - ShowNodeInfoRequest { - instance: Some((&cli.instance_select).into()), - }, - ) - .await? - .node_info - .ok_or(anyhow::anyhow!("node info not found"))?; - match sub_cmd.sub_command { - Some(NodeSubCommand::Info) | None => { - if cli.verbose || cli.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&node_info)?); - return Ok(()); - } - - let stun_info = node_info.stun_info.clone().unwrap_or_default(); - let ip_list = node_info.ip_list.clone().unwrap_or_default(); - - let mut builder = tabled::builder::Builder::default(); - builder.push_record(vec!["Virtual IP", node_info.ipv4_addr.as_str()]); - builder.push_record(vec!["Hostname", node_info.hostname.as_str()]); - builder.push_record(vec![ - "Proxy CIDRs", - node_info.proxy_cidrs.join(", ").as_str(), - ]); - builder.push_record(vec!["Peer ID", node_info.peer_id.to_string().as_str()]); - stun_info.public_ip.iter().for_each(|ip| { - let Ok(ip) = ip.parse::() else { - return; - }; - if ip.is_ipv4() { - builder.push_record(vec!["Public IPv4", ip.to_string().as_str()]); - } else { - builder.push_record(vec!["Public IPv6", ip.to_string().as_str()]); - } - }); - builder.push_record(vec![ - "UDP Stun Type", - format!("{:?}", stun_info.udp_nat_type()).as_str(), - ]); - ip_list.interface_ipv4s.iter().for_each(|ip| { - builder.push_record(vec!["Interface IPv4", ip.to_string().as_str()]); - }); - ip_list.interface_ipv6s.iter().for_each(|ip| { - builder.push_record(vec!["Interface IPv6", ip.to_string().as_str()]); - }); - for (idx, l) in node_info.listeners.iter().enumerate() { - if l.starts_with("ring") { - continue; - } - builder.push_record(vec![format!("Listener {}", idx).as_str(), l]); - } - - println!("{}", builder.build().with(Style::markdown())); - } - Some(NodeSubCommand::Config) => { - println!("{}", node_info.config); - } - } + handler.handle_node(sub_cmd.sub_command.as_ref()).await?; } SubCommand::Service(service_args) => { let service = Service::new(service_args.name)?; @@ -2287,75 +2812,10 @@ async fn main() -> Result<(), Error> { }, SubCommand::Stats(stats_args) => match &stats_args.sub_command { Some(StatsSubCommand::Show) | None => { - let client = handler.get_stats_client().await?; - let request = GetStatsRequest { - instance: Some((&cli.instance_select).into()), - }; - let response = client.get_stats(BaseController::default(), request).await?; - - if cli.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&response.metrics)?); - } else { - #[derive(tabled::Tabled, serde::Serialize)] - struct StatsTableRow { - #[tabled(rename = "Metric Name")] - name: String, - #[tabled(rename = "Value")] - value: String, - #[tabled(rename = "Labels")] - labels: String, - } - - let table_rows: Vec = response - .metrics - .iter() - .map(|metric| { - let labels_str = if metric.labels.is_empty() { - "-".to_string() - } else { - metric - .labels - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>() - .join(", ") - }; - - let formatted_value = if metric.name.contains("bytes") { - format_size(metric.value, humansize::BINARY) - } else if metric.name.contains("duration") { - format!("{} ms", metric.value) - } else { - metric.value.to_string() - }; - - StatsTableRow { - name: metric.name.clone(), - value: formatted_value, - labels: labels_str, - } - }) - .collect(); - - print_output( - &table_rows, - &cli.output_format, - &["labels"], - &["labels"], - cli.no_trunc, - )? - } + handler.handle_stats_show().await?; } Some(StatsSubCommand::Prometheus) => { - let client = handler.get_stats_client().await?; - let request = GetPrometheusStatsRequest { - instance: Some((&cli.instance_select).into()), - }; - let response = client - .get_prometheus_stats(BaseController::default(), request) - .await?; - - println!("{}", response.prometheus_text); + handler.handle_stats_prometheus().await?; } }, SubCommand::Logger(logger_args) => match &logger_args.sub_command { diff --git a/easytier/src/instance_manager.rs b/easytier/src/instance_manager.rs index 9e01ec2e..45974452 100644 --- a/easytier/src/instance_manager.rs +++ b/easytier/src/instance_manager.rs @@ -192,7 +192,13 @@ impl NetworkInstanceManager { self.instance_map.iter().map(|item| *item.key()).collect() } - pub fn get_network_instance_name(&self, instance_id: &uuid::Uuid) -> Option { + pub fn get_instance_name(&self, instance_id: &uuid::Uuid) -> Option { + self.instance_map + .get(instance_id) + .map(|instance| instance.value().get_inst_name()) + } + + pub fn get_network_name(&self, instance_id: &uuid::Uuid) -> Option { self.instance_map .get(instance_id) .map(|instance| instance.value().get_network_name()) diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index fa46d7bb..1b97e243 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -1,4 +1,4 @@ -use crate::common::config::{ConfigFileControl, PortForwardConfig}; +use crate::common::config::{process_secure_mode_cfg, ConfigFileControl, PortForwardConfig}; use crate::proto::api::{self, manage}; use crate::proto::rpc_types::controller::BaseController; use crate::rpc_service::InstanceRpcService; @@ -509,10 +509,29 @@ impl NetworkConfig { cfg.set_hostname(self.hostname.clone()); cfg.set_dhcp(self.dhcp.unwrap_or_default()); cfg.set_inst_name(self.network_name.clone().unwrap_or_default()); - cfg.set_network_identity(NetworkIdentity::new( - self.network_name.clone().unwrap_or_default(), - self.network_secret.clone().unwrap_or_default(), - )); + + // The web UI does not expose credential inputs directly, but imported/saved + // NetworkConfig objects still need to preserve credential-mode instances via + // secure_mode.local_private_key + empty network_secret. + let credential_secret = if self.network_secret.is_some() { + None + } else { + self.secure_mode + .as_ref() + .and_then(|mode| mode.local_private_key.clone()) + .filter(|s| !s.is_empty()) + }; + + if credential_secret.is_some() { + cfg.set_network_identity(NetworkIdentity::new_credential( + self.network_name.clone().unwrap_or_default(), + )); + } else { + cfg.set_network_identity(NetworkIdentity::new( + self.network_name.clone().unwrap_or_default(), + self.network_secret.clone().unwrap_or_default(), + )); + } if !cfg.get_dhcp() { let virtual_ipv4 = self.virtual_ipv4.clone().unwrap_or_default(); @@ -677,7 +696,30 @@ impl NetworkConfig { )); } - cfg.set_secure_mode(self.secure_mode.clone()); + if let Some(credential_file) = self + .credential_file + .as_ref() + .filter(|path| !path.is_empty()) + { + cfg.set_credential_file(Some(credential_file.into())); + } + + if let Some(credential_secret) = credential_secret { + cfg.set_secure_mode(Some(process_secure_mode_cfg( + crate::proto::common::SecureModeConfig { + enabled: true, + local_private_key: Some(credential_secret), + local_public_key: None, + }, + )?)); + } else { + cfg.set_secure_mode( + self.secure_mode + .clone() + .map(process_secure_mode_cfg) + .transpose()?, + ); + } let mut flags = gen_default_flags(); if let Some(latency_first) = self.latency_first { @@ -900,7 +942,9 @@ impl NetworkConfig { } result.secure_mode = config.get_secure_mode(); - + result.credential_file = config + .get_credential_file() + .map(|path| path.to_string_lossy().into_owned()); let flags = config.get_flags(); result.latency_first = Some(flags.latency_first); result.dev_name = Some(flags.dev_name.clone()); @@ -947,7 +991,11 @@ impl NetworkConfig { #[cfg(test)] mod tests { - use crate::{common::config::ConfigLoader, proto::common::SecureModeConfig}; + use crate::{ + common::config::{process_secure_mode_cfg, ConfigLoader}, + proto::common::SecureModeConfig, + }; + use base64::prelude::{Engine as _, BASE64_STANDARD}; use rand::Rng; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -1195,6 +1243,10 @@ mod tests { config.set_flags(flags); } + if let Some(secure_mode) = config.get_secure_mode() { + config.set_secure_mode(Some(process_secure_mode_cfg(secure_mode)?)); + } + let network_config = super::NetworkConfig::new_from_config(&config)?; let generated_config = network_config.gen_config()?; generated_config.set_peers(generated_config.get_peers()); // Ensure peers field is not None @@ -1211,4 +1263,59 @@ mod tests { Ok(()) } + + #[test] + fn test_network_config_conversion_credential_mode() -> Result<(), anyhow::Error> { + let private_key = x25519_dalek::StaticSecret::from([7u8; 32]); + let public_key = x25519_dalek::PublicKey::from(&private_key); + let credential_secret = BASE64_STANDARD.encode(private_key.as_bytes()); + let credential_file = "/tmp/easytier-credentials.json".to_string(); + + let config = gen_default_config(); + config.set_network_identity(crate::common::config::NetworkIdentity::new_credential( + "credential-net".to_string(), + )); + config.set_inst_name("credential-net".to_string()); + config.set_credential_file(Some(credential_file.clone().into())); + config.set_secure_mode(Some(SecureModeConfig { + enabled: true, + local_private_key: Some(credential_secret.clone()), + local_public_key: Some(BASE64_STANDARD.encode(public_key.as_bytes())), + })); + + let network_config = super::NetworkConfig::new_from_config(&config)?; + assert_eq!( + network_config.credential_file.as_deref(), + Some(credential_file.as_str()) + ); + assert_eq!(network_config.network_secret, None); + assert_eq!( + network_config + .secure_mode + .as_ref() + .and_then(|mode| mode.local_private_key.as_deref()), + Some(credential_secret.as_str()) + ); + + let generated_config = network_config.gen_config()?; + assert_eq!( + generated_config.get_network_identity().network_secret, + None, + "credential mode should not be converted back into network_secret mode" + ); + assert_eq!( + generated_config + .get_credential_file() + .map(|path| path.to_string_lossy().into_owned()), + Some(credential_file) + ); + assert_eq!( + generated_config + .get_secure_mode() + .and_then(|mode| mode.local_private_key), + Some(credential_secret) + ); + + Ok(()) + } } diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 903b713d..7d97f7af 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -617,6 +617,16 @@ pub struct ForeignNetworkManager { } impl ForeignNetworkManager { + async fn is_shared_pubkey_trusted( + entry: &ForeignNetworkEntry, + remote_static_pubkey: &[u8], + ) -> bool { + remote_static_pubkey.len() == 32 + && entry + .global_ctx + .is_pubkey_trusted(remote_static_pubkey, &entry.network.network_name) + } + pub fn new( my_peer_id: PeerId, global_ctx: ArcGlobalCtx, @@ -655,12 +665,14 @@ impl ForeignNetworkManager { } pub async fn add_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> { - tracing::info!(peer_conn = ?peer_conn.get_conn_info(), network = ?peer_conn.get_network_identity(), "add new peer conn in foreign network manager"); + let conn_info = peer_conn.get_conn_info(); + let peer_network = peer_conn.get_network_identity(); + tracing::info!(peer_conn = ?conn_info, network = ?peer_network, "add new peer conn in foreign network manager"); let relay_peer_rpc = self.global_ctx.get_flags().relay_all_peer_rpc; let ret = self .global_ctx - .check_network_in_whitelist(&peer_conn.get_network_identity().network_name) + .check_network_in_whitelist(&peer_network.network_name) .map_err(Into::into); if ret.is_err() && !relay_peer_rpc { return ret; @@ -669,7 +681,7 @@ impl ForeignNetworkManager { let (entry, new_added) = self .data .get_or_insert_entry( - &peer_conn.get_network_identity(), + &peer_network, peer_conn.get_my_peer_id(), peer_conn.get_peer_id(), ret.is_ok(), @@ -679,10 +691,14 @@ impl ForeignNetworkManager { ) .await; + let same_identity = entry.network == peer_network; + let shared_peer = peer_conn.get_peer_identity_type() == PeerIdentityType::SharedNode; + let shared_peer_trusted = shared_peer + && Self::is_shared_pubkey_trusted(&entry, &conn_info.noise_remote_static_pubkey).await; + let _g = entry.lock.lock().await; - if (entry.network != peer_conn.get_network_identity() - && peer_conn.get_peer_identity_type() != PeerIdentityType::SharedNode) + if (!(same_identity || shared_peer_trusted)) || entry.my_peer_id != peer_conn.get_my_peer_id() { if new_added { @@ -697,9 +713,11 @@ impl ForeignNetworkManager { ) } else { anyhow::anyhow!( - "network secret not match. exp: {:?} real: {:?}", + "foreign peer identity not trusted. exp: {:?} real: {:?}, remote_pubkey_len: {}, shared_trusted: {}", entry.network, - peer_conn.get_network_identity() + peer_network, + conn_info.noise_remote_static_pubkey.len(), + shared_peer_trusted, ) }; tracing::error!(?err, "foreign network entry not match, disconnect peer"); diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 5c1491b1..b2327c72 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -1441,9 +1441,9 @@ impl PeerConn { let info = self.info.as_ref().unwrap(); let mut ret = NetworkIdentity { network_name: info.network_name.clone(), - ..Default::default() + network_secret: None, + network_secret_digest: Some([0u8; 32]), }; - ret.network_secret_digest = Some([0u8; 32]); ret.network_secret_digest .as_mut() .unwrap() @@ -1619,7 +1619,15 @@ pub mod tests { assert_eq!(c_peer.get_peer_id(), s_peer_id); assert_eq!(s_peer.get_peer_id(), c_peer_id); assert_eq!(c_peer.get_network_identity(), s_peer.get_network_identity()); - assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default()); + assert_eq!( + c_peer.get_network_identity().network_name, + NetworkIdentity::default().network_name + ); + assert_eq!(c_peer.get_network_identity().network_secret, None); + assert_eq!( + c_peer.get_network_identity().network_secret_digest, + NetworkIdentity::default().network_secret_digest + ); } #[tokio::test] diff --git a/easytier/src/proto/api_instance.proto b/easytier/src/proto/api_instance.proto index 041f29fe..16042fbb 100644 --- a/easytier/src/proto/api_instance.proto +++ b/easytier/src/proto/api_instance.proto @@ -301,6 +301,7 @@ message GenerateCredentialRequest { repeated string allowed_proxy_cidrs = 3; // optional: restrict proxy_cidrs int64 ttl_seconds = 4; // must be > 0: credential TTL in seconds (0 / omitted is invalid) optional string credential_id = 5; // optional: user-specified credential id, reused if already exists + InstanceIdentifier instance = 6; // target network instance } message GenerateCredentialResponse { @@ -310,13 +311,16 @@ message GenerateCredentialResponse { message RevokeCredentialRequest { string credential_id = 1; + InstanceIdentifier instance = 2; // target network instance } message RevokeCredentialResponse { bool success = 1; } -message ListCredentialsRequest {} +message ListCredentialsRequest { + InstanceIdentifier instance = 1; // target network instance +} message CredentialInfo { string credential_id = 1; // UUID diff --git a/easytier/src/proto/api_manage.proto b/easytier/src/proto/api_manage.proto index 36e75c26..55461d99 100644 --- a/easytier/src/proto/api_manage.proto +++ b/easytier/src/proto/api_manage.proto @@ -83,6 +83,8 @@ message NetworkConfig { optional bool disable_tcp_hole_punching = 54; common.SecureModeConfig secure_mode = 55; + reserved 56; + optional string credential_file = 57; } message PortForwardConfig { @@ -124,6 +126,7 @@ message NetworkMeta { common.UUID inst_id = 1; string network_name = 2; uint32 config_permission = 3; + string instance_name = 4; } message ValidateConfigRequest { NetworkConfig config = 1; } diff --git a/easytier/src/rpc_service/credential_manage.rs b/easytier/src/rpc_service/credential_manage.rs index 5b13d0bc..716d979a 100644 --- a/easytier/src/rpc_service/credential_manage.rs +++ b/easytier/src/rpc_service/credential_manage.rs @@ -32,7 +32,7 @@ impl CredentialManageRpc for CredentialManageRpcService { ctrl: Self::Controller, req: GenerateCredentialRequest, ) -> crate::proto::rpc_types::error::Result { - super::get_instance_service(&self.instance_manager, &None)? + super::get_instance_service(&self.instance_manager, &req.instance)? .get_credential_manage_service() .generate_credential(ctrl, req) .await @@ -43,7 +43,7 @@ impl CredentialManageRpc for CredentialManageRpcService { ctrl: Self::Controller, req: RevokeCredentialRequest, ) -> crate::proto::rpc_types::error::Result { - super::get_instance_service(&self.instance_manager, &None)? + super::get_instance_service(&self.instance_manager, &req.instance)? .get_credential_manage_service() .revoke_credential(ctrl, req) .await @@ -54,7 +54,7 @@ impl CredentialManageRpc for CredentialManageRpcService { ctrl: Self::Controller, req: ListCredentialsRequest, ) -> crate::proto::rpc_types::error::Result { - super::get_instance_service(&self.instance_manager, &None)? + super::get_instance_service(&self.instance_manager, &req.instance)? .get_credential_manage_service() .list_credentials(ctrl, req) .await diff --git a/easytier/src/rpc_service/instance_manage.rs b/easytier/src/rpc_service/instance_manage.rs index 803c4487..4562344e 100644 --- a/easytier/src/rpc_service/instance_manage.rs +++ b/easytier/src/rpc_service/instance_manage.rs @@ -276,13 +276,17 @@ impl WebClientService for InstanceManageRpcService { let Some(control) = self.manager.get_instance_config_control(&inst_id) else { continue; }; - let Some(name) = self.manager.get_network_instance_name(&inst_id) else { + let Some(network_name) = self.manager.get_network_name(&inst_id) else { + continue; + }; + let Some(instance_name) = self.manager.get_instance_name(&inst_id) else { continue; }; let meta = NetworkMeta { inst_id: Some(inst_id.into()), - network_name: name, + network_name, config_permission: control.permission.into(), + instance_name, }; metas.push(meta); } diff --git a/easytier/src/rpc_service/logger.rs b/easytier/src/rpc_service/logger.rs index 197abc96..23a73cde 100644 --- a/easytier/src/rpc_service/logger.rs +++ b/easytier/src/rpc_service/logger.rs @@ -50,13 +50,6 @@ impl LoggerRpc for LoggerRpcService { ) -> Result { let level_str = Self::log_level_to_string(request.level()); - // 更新当前日志级别 - if let Some(current_level) = CURRENT_LOG_LEVEL.get() { - if let Ok(mut level) = current_level.lock() { - *level = level_str.clone(); - } - } - // 发送新的日志级别到 logger 重载器 if let Some(sender) = LOGGER_LEVEL_SENDER.get() { if let Ok(sender) = sender.lock() { @@ -78,6 +71,13 @@ impl LoggerRpc for LoggerRpcService { ))); } + // 更新当前日志级别 + if let Some(current_level) = CURRENT_LOG_LEVEL.get() { + if let Ok(mut level) = current_level.lock() { + *level = Self::log_level_to_string(request.level()); + } + } + Ok(SetLoggerConfigResponse {}) } diff --git a/easytier/src/rpc_service/remote_client.rs b/easytier/src/rpc_service/remote_client.rs index 45a4fe10..e40fcbcf 100644 --- a/easytier/src/rpc_service/remote_client.rs +++ b/easytier/src/rpc_service/remote_client.rs @@ -234,12 +234,14 @@ where let config = self .handle_get_network_config(identify.clone(), instance_id) .await?; + let network_name = config.network_name.unwrap_or_default(); metas.insert( instance_id, NetworkMeta { inst_id: Some(instance_id.into()), - network_name: config.network_name.unwrap_or_default(), + network_name: network_name.clone(), config_permission: 0, + instance_name: network_name, }, ); } diff --git a/easytier/src/tests/credential_tests.rs b/easytier/src/tests/credential_tests.rs index 650daedc..82eb0d4f 100644 --- a/easytier/src/tests/credential_tests.rs +++ b/easytier/src/tests/credential_tests.rs @@ -123,6 +123,28 @@ async fn create_credential_config( config } +/// Helper: Create credential node config with a random, unknown key +fn create_unknown_credential_config( + network_name: String, + inst_name: &str, + ns: Option<&str>, + ipv4: &str, + ipv6: &str, +) -> TomlConfigLoader { + let random_private = x25519_dalek::StaticSecret::random_from_rng(rand::rngs::OsRng); + + let config = TomlConfigLoader::default(); + config.set_inst_name(inst_name.to_owned()); + config.set_netns(ns.map(|s| s.to_owned())); + config.set_ipv4(Some(ipv4.parse().unwrap())); + config.set_ipv6(Some(ipv6.parse().unwrap())); + config.set_listeners(vec![]); + config.set_network_identity(NetworkIdentity::new_credential(network_name)); + config.set_secure_mode(Some(generate_secure_mode_config_with_key(&random_private))); + + config +} + /// Helper: Create admin node config fn create_admin_config( inst_name: &str, @@ -809,6 +831,113 @@ async fn credential_unknown_rejected() { drop_insts(vec![admin_inst, cred_inst]).await; } +/// Regression test: an unknown credential must still be rejected when it first connects via a +/// shared node. If this fails, the shared path is incorrectly admitting the node into the target +/// network's route domain. +#[tokio::test] +#[serial_test::serial] +async fn credential_unknown_via_shared_rejected() { + prepare_credential_network(); + + let admin_a_config = + create_admin_config("admin_a", Some("ns_adm"), "10.144.144.1", "fd00::1/64"); + let mut admin_a_inst = Instance::new(admin_a_config); + admin_a_inst.run().await.unwrap(); + + let shared_b_config = + create_shared_config("shared_b", Some("ns_c1"), "10.144.144.2", "fd00::2/64"); + let mut shared_b_inst = Instance::new(shared_b_config); + shared_b_inst.run().await.unwrap(); + + let admin_c_config = + create_admin_config("admin_c", Some("ns_c3"), "10.144.144.4", "fd00::4/64"); + let mut admin_c_inst = Instance::new(admin_c_config); + admin_c_inst.run().await.unwrap(); + + admin_a_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new( + "tcp://10.1.1.2:11010".parse().unwrap(), + )); + admin_c_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new( + "tcp://10.1.1.2:11010".parse().unwrap(), + )); + + let admin_c_peer_id = admin_c_inst.peer_id(); + wait_for_condition( + || async { + let a_routes = admin_a_inst.get_peer_manager().list_routes().await; + let c_routes = admin_c_inst.get_peer_manager().list_routes().await; + a_routes.iter().any(|r| r.peer_id == admin_c_peer_id) + || c_routes.iter().any(|r| r.peer_id == admin_a_inst.peer_id()) + }, + Duration::from_secs(10), + ) + .await; + + let unknown_config = create_unknown_credential_config( + admin_a_inst + .get_global_ctx() + .get_network_identity() + .network_name + .clone(), + "unknown_d", + Some("ns_c2"), + "10.144.144.5", + "fd00::5/64", + ); + let mut unknown_inst = Instance::new(unknown_config); + unknown_inst.run().await.unwrap(); + + unknown_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new( + "tcp://10.1.1.2:11010".parse().unwrap(), + )); + + let unknown_peer_id = unknown_inst.peer_id(); + + println!("unknown_peer_id: {:?}", unknown_peer_id); + + for _ in 0..5 { + let admin_a_routes = admin_a_inst.get_peer_manager().list_routes().await; + let admin_c_routes = admin_c_inst.get_peer_manager().list_routes().await; + + assert!( + !admin_a_routes.iter().any(|r| r.peer_id == unknown_peer_id), + "unknown credential unexpectedly appeared in admin_a routes via shared path: {:?}", + admin_a_routes.iter().map(|r| r.peer_id).collect::>() + ); + assert!( + !admin_c_routes.iter().any(|r| r.peer_id == unknown_peer_id), + "unknown credential unexpectedly appeared in admin_c routes via shared path: {:?}", + admin_c_routes.iter().map(|r| r.peer_id).collect::>() + ); + assert!( + !ping_test("ns_adm", "10.144.144.5", None).await, + "admin_a unexpectedly reached unknown credential via shared path" + ); + assert!( + !ping_test("ns_c3", "10.144.144.5", None).await, + "admin_c unexpectedly reached unknown credential via shared path" + ); + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + println!("drop all"); + + drop_insts(vec![ + admin_a_inst, + shared_b_inst, + admin_c_inst, + unknown_inst, + ]) + .await; +} + #[rstest::rstest] #[tokio::test] #[serial_test::serial] diff --git a/script/test-cli-multi-instance.sh b/script/test-cli-multi-instance.sh new file mode 100755 index 00000000..0ba8a193 --- /dev/null +++ b/script/test-cli-multi-instance.sh @@ -0,0 +1,269 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +REPO_ROOT=$(CDPATH= cd -- "$SCRIPT_DIR/.." && pwd) + +CORE_BIN=${CORE_BIN:-"$REPO_ROOT/target/debug/easytier-core"} +CLI_BIN=${CLI_BIN:-"$REPO_ROOT/target/debug/easytier-cli"} +TMPDIR_PATH="" +CORE_PID="" +PYTHON_BIN=${PYTHON_BIN:-python3} + +print_section() { + printf '\n==> %s\n' "$1" +} + +print_output() { + local title="$1" + local content="$2" + printf -- '---- %s ----\n' "$title" + printf '%s\n' "$content" + printf -- '---- end %s ----\n' "$title" +} + +build_binaries() { + print_section "Building easytier-core and easytier-cli" + cargo build -p easytier --bin easytier-core --bin easytier-cli +} + +ensure_binaries() { + if [[ "${SKIP_BUILD:-0}" != "1" ]] || [[ ! -x "$CORE_BIN" ]] || [[ ! -x "$CLI_BIN" ]]; then + build_binaries + fi +} + +make_tmpdir() { + "$PYTHON_BIN" - <<'PY' +import tempfile +print(tempfile.mkdtemp(prefix="easytier-cli-e2e-")) +PY +} + +cleanup_tmpdir() { + TMPDIR_TO_DELETE="$1" "$PYTHON_BIN" - <<'PY' +import os +import shutil + +shutil.rmtree(os.environ["TMPDIR_TO_DELETE"], ignore_errors=True) +PY +} + +alloc_port() { + "$PYTHON_BIN" - <<'PY' +import socket + +sock = socket.socket() +sock.bind(("127.0.0.1", 0)) +print(sock.getsockname()[1]) +sock.close() +PY +} + +wait_for_cli() { + local rpc_port="$1" + local attempts=0 + while (( attempts < 50 )); do + if "$CLI_BIN" -p "127.0.0.1:${rpc_port}" -o json node >/dev/null 2>&1; then + return 0 + fi + attempts=$((attempts + 1)) + sleep 0.2 + done + return 1 +} + +run_cmd() { + local __var_name="$1" + local title="$2" + shift 2 + + print_section "$title" + printf '+' + for arg in "$@"; do + printf ' %q' "$arg" + done + printf '\n' + + local output + if ! output=$("$@" 2>&1); then + print_output "$title output" "$output" + return 1 + fi + + print_output "$title output" "$output" + printf -v "$__var_name" '%s' "$output" +} + +assert_text_output() { + local text_output="$1" + grep -F '== e2e-inst-a (' <<<"$text_output" >/dev/null + grep -F '== e2e-inst-b (' <<<"$text_output" >/dev/null +} + +assert_multi_instance_json() { + local json_payload="$1" + JSON_PAYLOAD="$json_payload" "$PYTHON_BIN" - <<'PY' +import json +import os + +data = json.loads(os.environ["JSON_PAYLOAD"]) +assert isinstance(data, list), data +assert len(data) == 2, data + +names = {item["instance_name"] for item in data} +assert names == {"e2e-inst-a", "e2e-inst-b"}, names + +for item in data: + assert item["instance_id"], item + assert isinstance(item["result"], dict), item +PY +} + +assert_single_instance_json() { + local json_payload="$1" + JSON_PAYLOAD="$json_payload" "$PYTHON_BIN" - <<'PY' +import json +import os + +data = json.loads(os.environ["JSON_PAYLOAD"]) +assert isinstance(data, dict), data +assert data["config"].find('instance_name = "e2e-inst-a"') >= 0, data["config"] +PY +} + +assert_whitelist_fanout() { + local json_payload="$1" + JSON_PAYLOAD="$json_payload" "$PYTHON_BIN" - <<'PY' +import json +import os + +data = json.loads(os.environ["JSON_PAYLOAD"]) +assert len(data) == 2, data +for item in data: + assert item["result"]["tcp_ports"] == ["80", "443"], item + assert item["result"]["udp_ports"] == [], item +PY +} + +assert_single_instance_write() { + local json_payload="$1" + JSON_PAYLOAD="$json_payload" "$PYTHON_BIN" - <<'PY' +import json +import os + +data = {item["instance_name"]: item["result"] for item in json.loads(os.environ["JSON_PAYLOAD"])} +assert data["e2e-inst-a"]["tcp_ports"] == ["80", "443"], data +assert data["e2e-inst-b"]["tcp_ports"] == [], data +PY +} + +main() { + command -v "$PYTHON_BIN" >/dev/null 2>&1 || { + echo "python interpreter not found: $PYTHON_BIN" >&2 + exit 1 + } + + ensure_binaries + + TMPDIR_PATH=$(make_tmpdir) + print_section "Created temporary test directory" + printf '%s\n' "$TMPDIR_PATH" + + local rpc_port + rpc_port=$(alloc_port) + print_section "Allocated RPC port" + printf '%s\n' "$rpc_port" + + cleanup() { + if [[ -n "$CORE_PID" ]] && kill -0 "$CORE_PID" >/dev/null 2>&1; then + kill "$CORE_PID" >/dev/null 2>&1 || true + wait "$CORE_PID" >/dev/null 2>&1 || true + fi + if [[ -n "$TMPDIR_PATH" ]]; then + cleanup_tmpdir "$TMPDIR_PATH" + fi + } + trap cleanup EXIT + + cat >"$TMPDIR_PATH/inst-a.toml" <<'EOF' +instance_name = "e2e-inst-a" +listeners = [] + +[network_identity] +network_name = "e2e-net-a" +network_secret = "" + +[flags] +no_tun = true +enable_ipv6 = false +EOF + + cat >"$TMPDIR_PATH/inst-b.toml" <<'EOF' +instance_name = "e2e-inst-b" +listeners = [] + +[network_identity] +network_name = "e2e-net-b" +network_secret = "" + +[flags] +no_tun = true +enable_ipv6 = false +EOF + + "$CORE_BIN" --config-dir "$TMPDIR_PATH" --rpc-portal "127.0.0.1:${rpc_port}" \ + >"$TMPDIR_PATH/core.log" 2>&1 & + CORE_PID=$! + print_section "Started easytier-core" + printf 'pid=%s\n' "$CORE_PID" + + wait_for_cli "$rpc_port" + print_output "easytier-core startup log" "$(cat "$TMPDIR_PATH/core.log")" + + local text_output + run_cmd text_output \ + "Case 1: node fanout in table mode" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" node + assert_text_output "$text_output" + + local json_output + run_cmd json_output \ + "Case 2: node fanout in JSON mode" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" -o json node + assert_multi_instance_json "$json_output" + + local single_output + run_cmd single_output \ + "Case 3: explicit instance selector stays single-instance" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" --instance-name e2e-inst-a -o json node + assert_single_instance_json "$single_output" + + local set_whitelist_output + run_cmd set_whitelist_output \ + "Case 4: whitelist set-tcp fans out to all instances" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" whitelist set-tcp 80,443 + + local whitelist_output + run_cmd whitelist_output \ + "Case 5: whitelist show confirms fanout write" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" -o json whitelist show + assert_whitelist_fanout "$whitelist_output" + + local clear_whitelist_output + run_cmd clear_whitelist_output \ + "Case 6: explicit selector write only touches one instance" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" --instance-name e2e-inst-b whitelist clear-tcp + + local cleared_output + run_cmd cleared_output \ + "Case 7: whitelist show confirms single-instance write isolation" \ + "$CLI_BIN" -p "127.0.0.1:${rpc_port}" -o json whitelist show + assert_single_instance_write "$cleared_output" + + print_section "Result" + echo "CLI multi-instance E2E passed" +} + +main "$@"