feat(web): add webhook-managed machine access and multi-instance CLI support (#1989)

* feat: add webhook-managed access and multi-instance CLI support
* fix(foreign): verify credential of foreign credential peer
This commit is contained in:
KKRainbow
2026-03-15 12:08:50 +08:00
committed by GitHub
parent c8f3c5d6aa
commit e6ac31fb20
27 changed files with 2678 additions and 980 deletions
+44 -1
View File
@@ -19,6 +19,7 @@ use maxminddb::geoip2;
use session::{Location, Session};
use storage::{Storage, StorageToken};
use crate::webhook::SharedWebhookConfig;
use crate::FeatureFlags;
use tokio::task::JoinSet;
@@ -59,12 +60,18 @@ pub struct ClientManager {
storage: Storage,
feature_flags: Arc<FeatureFlags>,
webhook_config: SharedWebhookConfig,
geoip_db: Arc<Option<maxminddb::Reader<Vec<u8>>>>,
}
impl ClientManager {
pub fn new(db: Db, geoip_db: Option<String>, feature_flags: Arc<FeatureFlags>) -> Self {
pub fn new(
db: Db,
geoip_db: Option<String>,
feature_flags: Arc<FeatureFlags>,
webhook_config: SharedWebhookConfig,
) -> Self {
let client_sessions = Arc::new(DashMap::new());
let sessions: Arc<DashMap<url::Url, Arc<Session>>> = client_sessions.clone();
let mut tasks = JoinSet::new();
@@ -82,6 +89,7 @@ impl ClientManager {
client_sessions,
storage: Storage::new(db),
feature_flags,
webhook_config,
geoip_db: Arc::new(load_geoip_db(geoip_db)),
}
@@ -98,6 +106,7 @@ impl ClientManager {
let listeners_cnt = self.listeners_cnt.clone();
let geoip_db = self.geoip_db.clone();
let feature_flags = self.feature_flags.clone();
let webhook_config = self.webhook_config.clone();
self.tasks.spawn(async move {
while let Ok(tunnel) = listener.accept().await {
let (tunnel, secure) = match security::accept_or_upgrade_server_tunnel(tunnel).await {
@@ -121,6 +130,7 @@ impl ClientManager {
client_url.clone(),
location,
feature_flags.clone(),
webhook_config.clone(),
);
session.serve(tunnel).await;
sessions.insert(client_url, Arc::new(session));
@@ -165,6 +175,36 @@ impl ClientManager {
.map(|item| item.value().clone())
}
/// Find a session by machine_id regardless of user_id.
pub fn get_session_by_machine_id_global(
&self,
machine_id: &uuid::Uuid,
) -> Option<Arc<Session>> {
self.storage
.get_client_url_by_machine_id_global(machine_id)
.and_then(|url| {
self.client_sessions
.get(&url)
.map(|item| item.value().clone())
})
}
/// Get user_id associated with a machine_id.
pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option<UserIdInDb> {
self.storage.get_user_id_by_machine_id_global(machine_id)
}
pub async fn disconnect_session_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> bool {
let Some(client_url) = self.storage.get_client_url_by_machine_id_global(machine_id) else {
return false;
};
let Some((_, session)) = self.client_sessions.remove(&client_url) else {
return false;
};
session.stop().await;
true
}
pub async fn list_machine_by_user_id(&self, user_id: UserIdInDb) -> Vec<url::Url> {
self.storage.list_user_clients(user_id)
}
@@ -321,6 +361,9 @@ mod tests {
Db::memory_db().await,
None,
Arc::new(FeatureFlags::default()),
Arc::new(crate::webhook::WebhookConfig::new(
None, None, None, None, None,
)),
);
mgr.add_listener(Box::new(listener)).await.unwrap();
+180 -18
View File
@@ -18,6 +18,7 @@ use easytier::{
use tokio::sync::{broadcast, RwLock};
use super::storage::{Storage, StorageToken, WeakRefStorage};
use crate::webhook::SharedWebhookConfig;
use crate::FeatureFlags;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -31,9 +32,11 @@ pub struct Location {
pub struct SessionData {
storage: WeakRefStorage,
feature_flags: Arc<FeatureFlags>,
webhook_config: SharedWebhookConfig,
client_url: url::Url,
storage_token: Option<StorageToken>,
binding_version: Option<u64>,
notifier: broadcast::Sender<HeartbeatRequest>,
req: Option<HeartbeatRequest>,
location: Option<Location>,
@@ -45,14 +48,17 @@ impl SessionData {
client_url: url::Url,
location: Option<Location>,
feature_flags: Arc<FeatureFlags>,
webhook_config: SharedWebhookConfig,
) -> Self {
let (tx, _rx1) = broadcast::channel(2);
SessionData {
storage,
feature_flags,
webhook_config,
client_url,
storage_token: None,
binding_version: None,
notifier: tx,
req: None,
location,
@@ -77,6 +83,23 @@ impl Drop for SessionData {
if let Ok(storage) = Storage::try_from(self.storage.clone()) {
if let Some(token) = self.storage_token.as_ref() {
storage.remove_client(token);
// Notify the webhook receiver when a node disconnects.
if self.webhook_config.is_enabled() {
let webhook = self.webhook_config.clone();
let machine_id = token.machine_id.to_string();
let web_instance_id = webhook.web_instance_id.clone();
let binding_version = self.binding_version;
tokio::spawn(async move {
webhook
.notify_node_disconnected(&crate::webhook::NodeDisconnectedRequest {
machine_id,
web_instance_id,
binding_version,
})
.await;
});
}
}
}
}
@@ -90,6 +113,58 @@ struct SessionRpcService {
}
impl SessionRpcService {
async fn persist_webhook_network_config(
storage: &Storage,
user_id: i32,
machine_id: uuid::Uuid,
network_config: serde_json::Value,
) -> anyhow::Result<()> {
let mut network_config = network_config;
let network_name = network_config
.get("network_name")
.and_then(|v| v.as_str())
.filter(|v| !v.is_empty())
.ok_or_else(|| anyhow::anyhow!("webhook response missing network_name"))?
.to_string();
let existing_configs = storage
.db()
.list_network_configs((user_id, machine_id), ListNetworkProps::All)
.await
.map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?;
let inst_id = existing_configs
.iter()
.find_map(|cfg| {
let value = serde_json::from_str::<serde_json::Value>(&cfg.network_config).ok()?;
let cfg_network_name = value.get("network_name")?.as_str()?;
if cfg_network_name == network_name {
uuid::Uuid::parse_str(&cfg.network_instance_id).ok()
} else {
None
}
})
.unwrap_or_else(uuid::Uuid::new_v4);
let config_obj = network_config
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("webhook network_config must be a JSON object"))?;
config_obj.insert(
"instance_id".to_string(),
serde_json::Value::String(inst_id.to_string()),
);
config_obj
.entry("instance_name".to_string())
.or_insert_with(|| serde_json::Value::String(network_name.clone()));
let config = serde_json::from_value::<NetworkConfig>(network_config)?;
storage
.db()
.insert_or_update_user_network_config((user_id, machine_id), inst_id, config)
.await
.map_err(|e| anyhow::anyhow!("failed to persist webhook network config: {:?}", e))?;
Ok(())
}
async fn handle_heartbeat(
&self,
req: HeartbeatRequest,
@@ -106,28 +181,92 @@ impl SessionRpcService {
req.machine_id
))?;
let user_id = match storage
.db()
.get_user_id_by_token(req.user_token.clone())
.await
.with_context(|| {
format!(
"Failed to get user id by token from db: {:?}",
let (user_id, webhook_network_config, webhook_validated, binding_version) = if data
.webhook_config
.is_enabled()
{
let webhook_req = crate::webhook::ValidateTokenRequest {
token: req.user_token.clone(),
machine_id: machine_id.to_string(),
hostname: req.hostname.clone(),
version: req.easytier_version.clone(),
web_instance_id: data.webhook_config.web_instance_id.clone(),
web_instance_api_base_url: data.webhook_config.web_instance_api_base_url.clone(),
};
let resp = data
.webhook_config
.validate_token(&webhook_req)
.await
.map_err(|e| anyhow::anyhow!("Webhook token validation failed: {:?}", e))?;
if resp.valid {
let user_id = match storage
.db()
.get_user_id_by_token(req.user_token.clone())
.await
.map_err(|e| anyhow::anyhow!("DB error: {:?}", e))?
{
Some(id) => id,
None => storage
.auto_create_user(&req.user_token)
.await
.with_context(|| {
format!("Failed to auto-create webhook user: {:?}", req.user_token)
})?,
};
(
user_id,
resp.network_config,
true,
Some(resp.binding_version),
)
} else {
return Err(anyhow::anyhow!(
"Webhook rejected token for machine {:?}: {:?}",
machine_id,
req.user_token
)
})? {
Some(id) => id,
None if data.feature_flags.allow_auto_create_user => storage
.auto_create_user(&req.user_token)
.await
.with_context(|| format!("Failed to auto-create user: {:?}", req.user_token))?,
None => {
return Err(
anyhow::anyhow!("User not found by token: {:?}", req.user_token).into(),
);
.into());
}
} else {
let user_id = match storage
.db()
.get_user_id_by_token(req.user_token.clone())
.await
.with_context(|| {
format!(
"Failed to get user id by token from db: {:?}",
req.user_token
)
})? {
Some(id) => id,
None if data.feature_flags.allow_auto_create_user => storage
.auto_create_user(&req.user_token)
.await
.with_context(|| format!("Failed to auto-create user: {:?}", req.user_token))?,
None => {
return Err(
anyhow::anyhow!("User not found by token: {:?}", req.user_token).into(),
);
}
};
(user_id, None, false, None)
};
if webhook_validated {
if let Some(network_config) = webhook_network_config {
Self::persist_webhook_network_config(&storage, user_id, machine_id, network_config)
.await
.map_err(rpc_types::error::Error::from)?;
}
} else if webhook_network_config.is_some() {
return Err(anyhow::anyhow!(
"unexpected webhook network_config for non-webhook token {:?}",
req.user_token
)
.into());
}
if data.req.replace(req.clone()).is_none() {
assert!(data.storage_token.is_none());
data.storage_token = Some(StorageToken {
@@ -136,6 +275,23 @@ impl SessionRpcService {
machine_id,
user_id,
});
data.binding_version = binding_version;
// Notify the webhook receiver on the first successful heartbeat.
if data.webhook_config.is_enabled() {
let webhook = data.webhook_config.clone();
let connect_req = crate::webhook::NodeConnectedRequest {
machine_id: machine_id.to_string(),
token: req.user_token.clone(),
hostname: req.hostname.clone(),
version: req.easytier_version.clone(),
web_instance_id: webhook.web_instance_id.clone(),
binding_version,
};
tokio::spawn(async move {
webhook.notify_node_connected(&connect_req).await;
});
}
}
let Ok(report_time) = chrono::DateTime::<chrono::Local>::from_str(&req.report_time) else {
@@ -203,8 +359,10 @@ impl Session {
client_url: url::Url,
location: Option<Location>,
feature_flags: Arc<FeatureFlags>,
webhook_config: SharedWebhookConfig,
) -> Self {
let session_data = SessionData::new(storage, client_url, location, feature_flags);
let session_data =
SessionData::new(storage, client_url, location, feature_flags, webhook_config);
let data = Arc::new(RwLock::new(session_data));
let rpc_mgr =
@@ -335,6 +493,10 @@ impl Session {
self.rpc_mgr.is_running()
}
pub async fn stop(&self) {
self.rpc_mgr.stop().await;
}
pub fn data(&self) -> SharedSessionData {
self.data.clone()
}
+82 -12
View File
@@ -22,6 +22,7 @@ struct ClientInfo {
#[derive(Debug)]
pub struct StorageInner {
user_clients_map: DashMap<UserIdInDb, DashMap<uuid::Uuid, ClientInfo>>,
global_machine_map: DashMap<uuid::Uuid, ClientInfo>,
pub db: Db,
}
@@ -41,22 +42,19 @@ impl Storage {
pub fn new(db: Db) -> Self {
Storage(Arc::new(StorageInner {
user_clients_map: DashMap::new(),
global_machine_map: DashMap::new(),
db,
}))
}
fn remove_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
machine_id: &uuid::Uuid,
client_url: &url::Url,
) {
map.remove_if(machine_id, |_, v| v.storage_token.client_url == *client_url);
fn remove_client_info_map(map: &DashMap<uuid::Uuid, ClientInfo>, stoken: &StorageToken) {
map.remove_if(&stoken.machine_id, |_, v| {
v.storage_token.client_url == stoken.client_url
&& v.storage_token.user_id == stoken.user_id
});
}
fn update_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
client_info: &ClientInfo,
) {
fn update_client_info_map(map: &DashMap<uuid::Uuid, ClientInfo>, client_info: &ClientInfo) {
map.entry(client_info.storage_token.machine_id)
.and_modify(|e| {
if e.report_time < client_info.report_time {
@@ -78,14 +76,16 @@ impl Storage {
report_time,
};
Self::update_mid_to_client_info_map(&inner, &client_info);
Self::update_client_info_map(&inner, &client_info);
Self::update_client_info_map(&self.0.global_machine_map, &client_info);
}
pub fn remove_client(&self, stoken: &StorageToken) {
Self::remove_client_info_map(&self.0.global_machine_map, stoken);
self.0
.user_clients_map
.remove_if(&stoken.user_id, |_, set| {
Self::remove_mid_to_client_info_map(set, &stoken.machine_id, &stoken.client_url);
Self::remove_client_info_map(set, stoken);
set.is_empty()
});
}
@@ -106,6 +106,22 @@ impl Storage {
})
}
/// Find client_url by machine_id across all users.
pub fn get_client_url_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option<url::Url> {
self.0
.global_machine_map
.get(machine_id)
.map(|info| info.storage_token.client_url.clone())
}
/// Find user_id by machine_id across all users.
pub fn get_user_id_by_machine_id_global(&self, machine_id: &uuid::Uuid) -> Option<UserIdInDb> {
self.0
.global_machine_map
.get(machine_id)
.map(|info| info.storage_token.user_id)
}
pub fn list_user_clients(&self, user_id: UserIdInDb) -> Vec<url::Url> {
self.0
.user_clients_map
@@ -129,3 +145,57 @@ impl Storage {
Ok(new_user.id)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_storage_token(
user_id: UserIdInDb,
machine_id: uuid::Uuid,
client_url: &str,
) -> StorageToken {
StorageToken {
token: format!("token-{machine_id}"),
client_url: client_url.parse().unwrap(),
machine_id,
user_id,
}
}
#[tokio::test]
async fn global_machine_index_uses_latest_report_and_ignores_stale_removal() {
let storage = Storage::new(Db::memory_db().await);
let machine_id = uuid::Uuid::new_v4();
let old_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1001");
let new_token = make_storage_token(1, machine_id, "tcp://127.0.0.1:1002");
storage.update_client(old_token.clone(), 10);
storage.update_client(new_token.clone(), 20);
assert_eq!(
storage.get_client_url_by_machine_id_global(&machine_id),
Some(new_token.client_url.clone())
);
assert_eq!(
storage.get_user_id_by_machine_id_global(&machine_id),
Some(1)
);
storage.remove_client(&old_token);
assert_eq!(
storage.get_client_url_by_machine_id_global(&machine_id),
Some(new_token.client_url.clone())
);
storage.remove_client(&new_token);
assert_eq!(
storage.get_client_url_by_machine_id_global(&machine_id),
None
);
assert_eq!(storage.get_user_id_by_machine_id_global(&machine_id), None);
}
}