fix: reconcile webhook-managed configs and make disable_p2p more intelligent (#2057)

* reconcile infra configs on webhook validate
* make disable_p2p more intelligent
* fix stats
This commit is contained in:
KKRainbow
2026-04-04 23:41:57 +08:00
committed by GitHub
parent e91a0da70a
commit fb59f01058
17 changed files with 657 additions and 107 deletions
@@ -117,7 +117,7 @@ disable_quic_input: 禁用 QUIC 输入
disable_quic_input_help: 禁用 QUIC 入站流量,其他开启 QUIC 代理的节点仍然使用 TCP 连接到本节点。
disable_p2p: 禁用 P2P
disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器中转
disable_p2p_help: 禁用普通自动 P2P。开启 need-p2p 的节点仍可与当前节点建立 P2P
p2p_only: 仅 P2P
p2p_only_help: 仅与已经建立P2P连接的对等节点通信,不通过其他节点中转。
@@ -116,7 +116,7 @@ disable_quic_input: Disable QUIC Input
disable_quic_input_help: Disable inbound QUIC traffic, while nodes with QUIC proxy enabled continue to connect using TCP.
disable_p2p: Disable P2P
disable_p2p_help: Disable P2P mode; route all traffic through a manually specified relay server.
disable_p2p_help: Disable ordinary automatic P2P. Nodes with need-p2p enabled can still establish P2P with this node.
p2p_only: P2P Only
p2p_only_help: Only communicate with peers that have already established P2P connections, do not relay through other nodes.
+201 -56
View File
@@ -1,4 +1,9 @@
use std::{collections::HashSet, fmt::Debug, str::FromStr as _, sync::Arc};
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
str::FromStr as _,
sync::Arc,
};
use anyhow::Context;
use easytier::{
@@ -37,6 +42,7 @@ pub struct SessionData {
storage_token: Option<StorageToken>,
binding_version: Option<u64>,
applied_config_revision: Option<String>,
notifier: broadcast::Sender<HeartbeatRequest>,
req: Option<HeartbeatRequest>,
location: Option<Location>,
@@ -59,6 +65,7 @@ impl SessionData {
client_url,
storage_token: None,
binding_version: None,
applied_config_revision: None,
notifier: tx,
req: None,
location,
@@ -117,37 +124,16 @@ struct SessionRpcService {
}
impl SessionRpcService {
async fn persist_webhook_network_config(
storage: &Storage,
user_id: i32,
machine_id: uuid::Uuid,
network_config: serde_json::Value,
) -> anyhow::Result<()> {
let mut network_config = network_config;
fn normalize_network_config(
mut network_config: serde_json::Value,
inst_id: uuid::Uuid,
) -> anyhow::Result<NetworkConfig> {
let network_name = network_config
.get("network_name")
.and_then(|v| v.as_str())
.filter(|v| !v.is_empty())
.ok_or_else(|| anyhow::anyhow!("webhook response missing network_name"))?
.to_string();
let existing_configs = storage
.db()
.list_network_configs((user_id, machine_id), ListNetworkProps::All)
.await
.map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?;
let inst_id = existing_configs
.iter()
.find_map(|cfg| {
let value = serde_json::from_str::<serde_json::Value>(&cfg.network_config).ok()?;
let cfg_network_name = value.get("network_name")?.as_str()?;
if cfg_network_name == network_name {
uuid::Uuid::parse_str(&cfg.network_instance_id).ok()
} else {
None
}
})
.unwrap_or_else(uuid::Uuid::new_v4);
let config_obj = network_config
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("webhook network_config must be a JSON object"))?;
@@ -157,14 +143,66 @@ impl SessionRpcService {
);
config_obj
.entry("instance_name".to_string())
.or_insert_with(|| serde_json::Value::String(network_name.clone()));
.or_insert_with(|| serde_json::Value::String(network_name));
let config = serde_json::from_value::<NetworkConfig>(network_config)?;
storage
Ok(serde_json::from_value::<NetworkConfig>(network_config)?)
}
async fn reconcile_managed_network_configs(
storage: &Storage,
user_id: i32,
machine_id: uuid::Uuid,
desired_configs: Vec<crate::webhook::ManagedNetworkConfig>,
) -> anyhow::Result<()> {
let existing_configs = storage
.db()
.insert_or_update_user_network_config((user_id, machine_id), inst_id, config)
.list_network_configs((user_id, machine_id), ListNetworkProps::All)
.await
.map_err(|e| anyhow::anyhow!("failed to persist webhook network config: {:?}", e))?;
.map_err(|e| anyhow::anyhow!("failed to list existing network configs: {:?}", e))?;
let existing_ids = existing_configs
.iter()
.filter_map(|cfg| uuid::Uuid::parse_str(&cfg.network_instance_id).ok())
.collect::<HashSet<_>>();
let mut desired_ids = HashSet::with_capacity(desired_configs.len());
let mut normalized = HashMap::with_capacity(desired_configs.len());
for desired in desired_configs {
let inst_id = uuid::Uuid::parse_str(&desired.instance_id).with_context(|| {
format!(
"invalid desired managed instance id: {}",
desired.instance_id
)
})?;
let config = Self::normalize_network_config(desired.network_config, inst_id)?;
desired_ids.insert(inst_id);
normalized.insert(inst_id, config);
}
for (inst_id, config) in normalized {
storage
.db()
.insert_or_update_user_network_config((user_id, machine_id), inst_id, config)
.await
.map_err(|e| {
anyhow::anyhow!(
"failed to persist managed network config {}: {:?}",
inst_id,
e
)
})?;
}
let stale_ids = existing_ids
.difference(&desired_ids)
.copied()
.collect::<Vec<_>>();
if !stale_ids.is_empty() {
storage
.db()
.delete_network_configs((user_id, machine_id), &stale_ids)
.await
.map_err(|e| anyhow::anyhow!("failed to delete stale network configs: {:?}", e))?;
}
Ok(())
}
@@ -185,10 +223,13 @@ impl SessionRpcService {
req.machine_id
))?;
let (user_id, webhook_network_config, webhook_validated, binding_version) = if data
.webhook_config
.is_enabled()
{
let (
user_id,
webhook_managed_network_configs,
webhook_config_revision,
webhook_validated,
binding_version,
) = if data.webhook_config.is_enabled() {
let webhook_req = crate::webhook::ValidateTokenRequest {
token: req.user_token.clone(),
machine_id: machine_id.to_string(),
@@ -223,7 +264,8 @@ impl SessionRpcService {
};
(
user_id,
resp.network_config,
resp.managed_network_configs,
resp.config_revision,
true,
Some(resp.binding_version),
)
@@ -257,21 +299,21 @@ impl SessionRpcService {
);
}
};
(user_id, None, false, None)
(user_id, Vec::new(), String::new(), false, None)
};
if webhook_validated {
if let Some(network_config) = webhook_network_config {
Self::persist_webhook_network_config(&storage, user_id, machine_id, network_config)
.await
.map_err(rpc_types::error::Error::from)?;
}
} else if webhook_network_config.is_some() {
return Err(anyhow::anyhow!(
"unexpected webhook network_config for non-webhook token {:?}",
req.user_token
if webhook_validated
&& data.applied_config_revision.as_deref() != Some(webhook_config_revision.as_str())
{
Self::reconcile_managed_network_configs(
&storage,
user_id,
machine_id,
webhook_managed_network_configs,
)
.into());
.await
.map_err(rpc_types::error::Error::from)?;
data.applied_config_revision = Some(webhook_config_revision);
}
if data.req.replace(req.clone()).is_none() {
@@ -411,6 +453,7 @@ impl Session {
rpc_client: SessionRpcClient,
) {
let mut cleaned_web_managed_instances = false;
let mut last_desired_inst_ids: Option<HashSet<String>> = None;
loop {
heartbeat_waiter = heartbeat_waiter.resubscribe();
let req = heartbeat_waiter.recv().await;
@@ -467,8 +510,15 @@ impl Session {
};
let mut has_failed = false;
let should_be_alive_inst_ids = local_configs
.iter()
.map(|cfg| cfg.network_instance_id.clone())
.collect::<HashSet<_>>();
let desired_changed = last_desired_inst_ids
.as_ref()
.is_none_or(|last| last != &should_be_alive_inst_ids);
if !cleaned_web_managed_instances {
if !cleaned_web_managed_instances || desired_changed {
let all_local_configs = match storage
.db
.list_network_configs((user_id, machine_id.into()), ListNetworkProps::All)
@@ -486,11 +536,6 @@ impl Session {
.map(|cfg| cfg.network_instance_id.clone())
.collect::<HashSet<_>>();
let should_be_alive_inst_ids = local_configs
.iter()
.map(|cfg| cfg.network_instance_id.clone())
.collect::<HashSet<_>>();
let should_delete_ids = running_inst_ids
.iter()
.chain(all_inst_ids.iter())
@@ -519,6 +564,7 @@ impl Session {
if !has_failed {
cleaned_web_managed_instances = true;
last_desired_inst_ids = Some(should_be_alive_inst_ids.clone());
}
}
@@ -549,8 +595,7 @@ impl Session {
}
if !has_failed {
tracing::info!(?req, "All network instances are running");
break;
last_desired_inst_ids = Some(should_be_alive_inst_ids);
}
}
}
@@ -585,3 +630,103 @@ impl Session {
self.data.read().await.req()
}
}
#[cfg(test)]
mod tests {
use easytier::rpc_service::remote_client::{ListNetworkProps, Storage as _};
use serde_json::json;
use super::{super::storage::Storage, *};
#[tokio::test]
async fn reconcile_managed_network_configs_upserts_and_deletes_exact_set() {
let storage = Storage::new(crate::db::Db::memory_db().await);
let user_id = storage
.db()
.auto_create_user("webhook-user")
.await
.unwrap()
.id;
let machine_id = uuid::Uuid::new_v4();
let keep_id = uuid::Uuid::new_v4();
let stale_id = uuid::Uuid::new_v4();
let new_id = uuid::Uuid::new_v4();
storage
.db()
.insert_or_update_user_network_config(
(user_id, machine_id),
keep_id,
NetworkConfig {
network_name: Some("old-name".to_string()),
..Default::default()
},
)
.await
.unwrap();
storage
.db()
.insert_or_update_user_network_config(
(user_id, machine_id),
stale_id,
NetworkConfig {
network_name: Some("stale".to_string()),
..Default::default()
},
)
.await
.unwrap();
SessionRpcService::reconcile_managed_network_configs(
&storage,
user_id,
machine_id,
vec![
crate::webhook::ManagedNetworkConfig {
instance_id: keep_id.to_string(),
network_config: json!({
"instance_id": keep_id.to_string(),
"network_name": "updated-name"
}),
},
crate::webhook::ManagedNetworkConfig {
instance_id: new_id.to_string(),
network_config: json!({
"instance_id": new_id.to_string(),
"network_name": "new-name"
}),
},
],
)
.await
.unwrap();
let configs = storage
.db()
.list_network_configs((user_id, machine_id), ListNetworkProps::All)
.await
.unwrap();
let config_ids = configs
.iter()
.map(|cfg| cfg.network_instance_id.clone())
.collect::<HashSet<_>>();
assert_eq!(configs.len(), 2);
assert!(config_ids.contains(&keep_id.to_string()));
assert!(config_ids.contains(&new_id.to_string()));
assert!(!config_ids.contains(&stale_id.to_string()));
let updated_keep = storage
.db()
.get_network_config((user_id, machine_id), &keep_id.to_string())
.await
.unwrap()
.unwrap();
let updated_keep_config: NetworkConfig =
serde_json::from_str(&updated_keep.network_config).unwrap();
assert_eq!(
updated_keep_config.network_name.as_deref(),
Some("updated-name")
);
}
}
+71 -9
View File
@@ -154,13 +154,17 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for
use entity::user_running_network_configs as urnc;
let on_conflict = OnConflict::column(urnc::Column::NetworkInstanceId)
.update_columns([
urnc::Column::NetworkConfig,
urnc::Column::Disabled,
urnc::Column::UpdateTime,
])
.to_owned();
let on_conflict = OnConflict::columns([
urnc::Column::UserId,
urnc::Column::DeviceId,
urnc::Column::NetworkInstanceId,
])
.update_columns([
urnc::Column::NetworkConfig,
urnc::Column::Disabled,
urnc::Column::UpdateTime,
])
.to_owned();
let insert_m = urnc::ActiveModel {
user_id: sea_orm::Set(user_id),
device_id: sea_orm::Set(device_id.to_string()),
@@ -184,13 +188,14 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for
async fn delete_network_configs(
&self,
(user_id, _): (UserIdInDb, Uuid),
(user_id, device_id): (UserIdInDb, Uuid),
network_inst_ids: &[Uuid],
) -> Result<(), DbErr> {
use entity::user_running_network_configs as urnc;
urnc::Entity::delete_many()
.filter(urnc::Column::UserId.eq(user_id))
.filter(urnc::Column::DeviceId.eq(device_id.to_string()))
.filter(
urnc::Column::NetworkInstanceId
.is_in(network_inst_ids.iter().map(|id| id.to_string())),
@@ -203,7 +208,7 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for
async fn update_network_config_state(
&self,
(user_id, _): (UserIdInDb, Uuid),
(user_id, device_id): (UserIdInDb, Uuid),
network_inst_id: Uuid,
disabled: bool,
) -> Result<(), DbErr> {
@@ -211,6 +216,7 @@ impl Storage<(UserIdInDb, Uuid), user_running_network_configs::Model, DbErr> for
urnc::Entity::update_many()
.filter(urnc::Column::UserId.eq(user_id))
.filter(urnc::Column::DeviceId.eq(device_id.to_string()))
.filter(urnc::Column::NetworkInstanceId.eq(network_inst_id.to_string()))
.col_expr(urnc::Column::Disabled, Expr::value(disabled))
.col_expr(
@@ -341,4 +347,60 @@ mod tests {
.unwrap();
assert!(result3.is_none());
}
#[tokio::test]
async fn test_user_network_config_same_instance_id_is_scoped_by_device() {
let db = Db::memory_db().await;
let user_id = db.auto_create_user("user-1").await.unwrap().id;
let device1 = uuid::Uuid::new_v4();
let device2 = uuid::Uuid::new_v4();
let inst_id = uuid::Uuid::new_v4();
db.insert_or_update_user_network_config(
(user_id, device1),
inst_id,
NetworkConfig {
network_name: Some("cfg-1".to_string()),
..Default::default()
},
)
.await
.unwrap();
db.insert_or_update_user_network_config(
(user_id, device2),
inst_id,
NetworkConfig {
network_name: Some("cfg-2".to_string()),
..Default::default()
},
)
.await
.unwrap();
let first = db
.get_network_config((user_id, device1), &inst_id.to_string())
.await
.unwrap()
.unwrap();
let second = db
.get_network_config((user_id, device2), &inst_id.to_string())
.await
.unwrap()
.unwrap();
assert_eq!(first.user_id, user_id);
assert_eq!(first.device_id, device1.to_string());
assert_eq!(second.user_id, user_id);
assert_eq!(second.device_id, device2.to_string());
let device1_configs = db
.list_network_configs((user_id, device1), ListNetworkProps::All)
.await
.unwrap();
let device2_configs = db
.list_network_configs((user_id, device2), ListNetworkProps::All)
.await
.unwrap();
assert_eq!(device1_configs.len(), 1);
assert_eq!(device2_configs.len(), 1);
}
}
@@ -0,0 +1,120 @@
use sea_orm_migration::prelude::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20260403_000002_scope_network_config_unique"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"
CREATE TABLE user_running_network_configs_new (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
user_id INTEGER NOT NULL,
device_id TEXT NOT NULL,
network_instance_id TEXT NOT NULL,
network_config TEXT NOT NULL,
disabled BOOLEAN NOT NULL DEFAULT FALSE,
create_time TEXT NOT NULL,
update_time TEXT NOT NULL,
CONSTRAINT fk_user_running_network_configs_user_id_to_users_id
FOREIGN KEY (user_id) REFERENCES users(id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
INSERT INTO user_running_network_configs_new (
id,
user_id,
device_id,
network_instance_id,
network_config,
disabled,
create_time,
update_time
)
SELECT
id,
user_id,
device_id,
network_instance_id,
network_config,
disabled,
create_time,
update_time
FROM user_running_network_configs;
DROP TABLE user_running_network_configs;
ALTER TABLE user_running_network_configs_new RENAME TO user_running_network_configs;
CREATE INDEX idx_user_running_network_configs_user_id
ON user_running_network_configs(user_id);
CREATE UNIQUE INDEX idx_user_running_network_configs_scope_inst
ON user_running_network_configs(user_id, device_id, network_instance_id);
"#,
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"
CREATE TABLE user_running_network_configs_old (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
user_id INTEGER NOT NULL,
device_id TEXT NOT NULL,
network_instance_id TEXT NOT NULL UNIQUE,
network_config TEXT NOT NULL,
disabled BOOLEAN NOT NULL DEFAULT FALSE,
create_time TEXT NOT NULL,
update_time TEXT NOT NULL,
CONSTRAINT fk_user_running_network_configs_user_id_to_users_id
FOREIGN KEY (user_id) REFERENCES users(id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
INSERT INTO user_running_network_configs_old (
id,
user_id,
device_id,
network_instance_id,
network_config,
disabled,
create_time,
update_time
)
SELECT
id,
user_id,
device_id,
network_instance_id,
network_config,
disabled,
create_time,
update_time
FROM user_running_network_configs;
DROP TABLE user_running_network_configs;
ALTER TABLE user_running_network_configs_old RENAME TO user_running_network_configs;
CREATE INDEX idx_user_running_network_configs_user_id
ON user_running_network_configs(user_id);
"#,
)
.await?;
Ok(())
}
}
+5 -1
View File
@@ -1,12 +1,16 @@
use sea_orm_migration::prelude::*;
mod m20241029_000001_init;
mod m20260403_000002_scope_network_config_unique;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20241029_000001_init::Migration)]
vec![
Box::new(m20241029_000001_init::Migration),
Box::new(m20260403_000002_scope_network_config_unique::Migration),
]
}
}
+8 -1
View File
@@ -65,7 +65,14 @@ pub struct ValidateTokenResponse {
pub pre_approved: bool,
#[serde(default)]
pub binding_version: u64,
pub network_config: Option<serde_json::Value>,
pub managed_network_configs: Vec<ManagedNetworkConfig>,
pub config_revision: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ManagedNetworkConfig {
pub instance_id: String,
pub network_config: serde_json::Value,
}
#[derive(Debug, Serialize)]