introduce uptime monitor for easytier public nodes (#1250)

This commit is contained in:
Sijie.Sun
2025-08-20 22:59:44 +08:00
committed by GitHub
parent 8f37d4ef7c
commit e6ec7f405c
61 changed files with 12122 additions and 17 deletions
@@ -0,0 +1,360 @@
use crate::db::entity::*;
use crate::db::Db;
use sea_orm::*;
use tokio::time::{sleep, Duration};
use tracing::{error, info, warn};
/// 数据清理策略配置
#[derive(Debug, Clone)]
pub struct CleanupConfig {
/// 健康记录保留天数
pub health_record_retention_days: i64,
/// 每个节点保留的健康记录最大数量
pub max_health_records_per_node: u64,
/// 清理任务运行间隔(秒)
pub cleanup_interval_seconds: u64,
/// 是否启用自动清理
pub auto_cleanup_enabled: bool,
}
impl Default for CleanupConfig {
fn default() -> Self {
Self {
health_record_retention_days: 30,
max_health_records_per_node: 70000,
cleanup_interval_seconds: 1200, // 20分钟
auto_cleanup_enabled: true,
}
}
}
/// 数据清理管理器
pub struct CleanupManager {
db: Db,
config: CleanupConfig,
running: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl CleanupManager {
/// 创建新的清理管理器
pub fn new(db: Db, config: CleanupConfig) -> Self {
Self {
db,
config,
running: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
/// 使用默认配置创建清理管理器
pub fn with_default_config(db: Db) -> Self {
Self::new(db, CleanupConfig::default())
}
/// 启动自动清理任务
pub async fn start_auto_cleanup(&self) -> anyhow::Result<()> {
if self.config.auto_cleanup_enabled {
let running = self.running.clone();
let db = self.db.clone();
let config = self.config.clone();
running.store(true, std::sync::atomic::Ordering::SeqCst);
tokio::spawn(async move {
info!("Auto cleanup task started");
while running.load(std::sync::atomic::Ordering::SeqCst) {
if let Err(e) = Self::perform_cleanup(&db, &config).await {
error!("Auto cleanup failed: {}", e);
}
sleep(Duration::from_secs(config.cleanup_interval_seconds)).await;
}
info!("Auto cleanup task stopped");
});
}
Ok(())
}
/// 停止自动清理任务
pub fn stop_auto_cleanup(&self) {
self.running
.store(false, std::sync::atomic::Ordering::SeqCst);
}
/// 执行一次完整的清理操作
pub async fn perform_cleanup(db: &Db, config: &CleanupConfig) -> anyhow::Result<CleanupResult> {
let mut result = CleanupResult::default();
// 清理旧的健康记录
let health_cleanup_result =
Self::cleanup_old_health_records(db, config.health_record_retention_days).await?;
result.old_health_records_cleaned = health_cleanup_result.records_removed;
// 清理过量的健康记录
let excess_cleanup_result =
Self::cleanup_excess_health_records(db, config.max_health_records_per_node).await?;
result.excess_health_records_cleaned = excess_cleanup_result.records_removed;
// 数据库维护
let maintenance_result = Self::perform_database_maintenance(db).await?;
result.vacuum_performed = maintenance_result.vacuum_performed;
result.analyze_performed = maintenance_result.analyze_performed;
info!("Cleanup completed: {:?}", result);
Ok(result)
}
/// 清理旧的健康记录
async fn cleanup_old_health_records(
db: &Db,
days: i64,
) -> anyhow::Result<CleanupHealthRecordsResult> {
let cutoff = chrono::Local::now().fixed_offset() - chrono::Duration::days(days);
let result = health_records::Entity::delete_many()
.filter(health_records::Column::CheckedAt.lt(cutoff))
.exec(db.orm_db())
.await?;
let records_removed = result.rows_affected;
if records_removed > 0 {
info!(
"Cleaned {} old health records (older than {} days)",
records_removed, days
);
}
Ok(CleanupHealthRecordsResult { records_removed })
}
/// 清理过量的健康记录
async fn cleanup_excess_health_records(
db: &Db,
max_records: u64,
) -> anyhow::Result<CleanupExcessRecordsResult> {
// 获取所有节点
let nodes = shared_nodes::Entity::find().all(db.orm_db()).await?;
let mut total_removed = 0;
for node in nodes {
// 计算需要删除的记录数量
let total_count = health_records::Entity::find()
.filter(health_records::Column::NodeId.eq(node.id))
.count(db.orm_db())
.await?;
if total_count > max_records {
let to_remove = total_count - max_records;
// 获取需要保留的最小ID
let keep_id = health_records::Entity::find()
.filter(health_records::Column::NodeId.eq(node.id))
.order_by_desc(health_records::Column::CheckedAt)
.offset(max_records)
.limit(1)
.into_model::<health_records::Model>()
.one(db.orm_db())
.await?;
info!(
"Node {}: total count: {}, to remove: {}, last keep record: {:?}",
node.id, total_count, to_remove, keep_id
);
if let Some(keep_record) = keep_id {
// 删除比保留记录更早的记录
let result = health_records::Entity::delete_many()
.filter(health_records::Column::NodeId.eq(node.id))
.filter(health_records::Column::Id.lt(keep_record.id))
.exec(db.orm_db())
.await?;
total_removed += result.rows_affected;
}
}
}
if total_removed > 0 {
info!(
"Cleaned {} excess health records (max {} per node)",
total_removed, max_records
);
}
Ok(CleanupExcessRecordsResult {
records_removed: total_removed,
})
}
/// 执行数据库维护操作
async fn perform_database_maintenance(db: &Db) -> anyhow::Result<DatabaseMaintenanceResult> {
let mut vacuum_performed = false;
let mut analyze_performed = false;
// 执行 ANALYZE
match db
.orm_db()
.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"ANALYZE".to_string(),
))
.await
{
Ok(_) => {
analyze_performed = true;
info!("Database ANALYZE completed");
}
Err(e) => {
warn!("Database ANALYZE failed: {}", e);
}
}
// 执行 VACUUM(仅在需要时)
if vacuum_performed || analyze_performed {
match db
.orm_db()
.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"VACUUM".to_string(),
))
.await
{
Ok(_) => {
vacuum_performed = true;
info!("Database VACUUM completed");
}
Err(e) => {
warn!("Database VACUUM failed: {}", e);
}
}
}
Ok(DatabaseMaintenanceResult {
vacuum_performed,
analyze_performed,
})
}
/// 获取数据库统计信息
pub async fn get_database_stats(db: &Db) -> anyhow::Result<DatabaseStats> {
let total_nodes = shared_nodes::Entity::find().count(db.orm_db()).await?;
let total_health_records = health_records::Entity::find().count(db.orm_db()).await?;
let active_nodes = shared_nodes::Entity::find()
.filter(shared_nodes::Column::IsActive.eq(true))
.count(db.orm_db())
.await?;
Ok(DatabaseStats {
total_nodes,
active_nodes,
total_health_records,
})
}
/// 获取清理配置
pub fn get_config(&self) -> &CleanupConfig {
&self.config
}
/// 更新清理配置
pub fn update_config(&mut self, config: CleanupConfig) {
self.config = config;
}
}
/// 清理结果
#[derive(Default, Debug, Clone, serde::Serialize)]
pub struct CleanupResult {
pub old_health_records_cleaned: u64,
pub old_instances_cleaned: u64,
pub excess_health_records_cleaned: u64,
pub vacuum_performed: bool,
pub analyze_performed: bool,
}
/// 健康记录清理结果
#[derive(Debug, Clone, serde::Serialize)]
pub struct CleanupHealthRecordsResult {
pub records_removed: u64,
}
/// 停止实例清理结果
#[derive(Debug, Clone, serde::Serialize)]
pub struct CleanupStoppedInstancesResult {
pub instances_removed: u64,
}
/// 过量记录清理结果
#[derive(Debug, Clone, serde::Serialize)]
pub struct CleanupExcessRecordsResult {
pub records_removed: u64,
}
/// 数据库维护结果
#[derive(Debug, Clone, serde::Serialize)]
pub struct DatabaseMaintenanceResult {
pub vacuum_performed: bool,
pub analyze_performed: bool,
}
/// 数据库统计信息
#[derive(Debug, Clone, serde::Serialize)]
pub struct DatabaseStats {
pub total_nodes: u64,
pub active_nodes: u64,
pub total_health_records: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Db;
#[tokio::test]
async fn test_cleanup_manager() {
let db = Db::memory_db().await;
let cleanup_manager = CleanupManager::with_default_config(db.clone());
// 测试获取配置
let config = cleanup_manager.get_config();
assert_eq!(config.health_record_retention_days, 30);
// 测试清理操作
let result = CleanupManager::perform_cleanup(&db, config).await.unwrap();
println!("Cleanup result: {:?}", result);
// 测试获取统计信息
let stats = CleanupManager::get_database_stats(&db).await.unwrap();
println!("Database stats: {:?}", stats);
}
#[tokio::test]
async fn test_cleanup_config() {
let config = CleanupConfig {
health_record_retention_days: 7,
max_health_records_per_node: 500,
cleanup_interval_seconds: 1800,
auto_cleanup_enabled: false,
};
let db = Db::memory_db().await;
let mut cleanup_manager = CleanupManager::new(db, config.clone());
assert_eq!(cleanup_manager.get_config().health_record_retention_days, 7);
// 测试更新配置
let new_config = CleanupConfig::default();
cleanup_manager.update_config(new_config);
assert_eq!(
cleanup_manager.get_config().health_record_retention_days,
30
);
}
}
@@ -0,0 +1,39 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "connection_instances")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub node_id: i32,
#[sea_orm(unique)]
pub instance_id: String,
pub status: String,
#[sea_orm(column_type = "Text")]
pub config: String,
pub started_at: DateTimeWithTimeZone,
pub stopped_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::shared_nodes::Entity",
from = "Column::NodeId",
to = "super::shared_nodes::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
SharedNodes,
}
impl Related<super::shared_nodes::Entity> for Entity {
fn to() -> RelationDef {
Relation::SharedNodes.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
@@ -0,0 +1,37 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "health_records")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub node_id: i32,
pub status: String,
pub response_time: i32,
#[sea_orm(column_type = "Text")]
pub error_message: String,
pub checked_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::shared_nodes::Entity",
from = "Column::NodeId",
to = "super::shared_nodes::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
SharedNodes,
}
impl Related<super::shared_nodes::Entity> for Entity {
fn to() -> RelationDef {
Relation::SharedNodes.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
@@ -0,0 +1,6 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
pub mod prelude;
pub mod health_records;
pub mod shared_nodes;
@@ -0,0 +1,4 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
pub use super::health_records::Entity as HealthRecords;
pub use super::shared_nodes::Entity as SharedNodes;
@@ -0,0 +1,44 @@
//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "shared_nodes")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub name: String,
pub host: String,
pub port: i32,
pub protocol: String,
pub version: String,
pub allow_relay: bool,
pub network_name: String,
pub network_secret: String,
#[sea_orm(column_type = "Text")]
pub description: String,
pub max_connections: i32,
pub current_connections: i32,
pub is_active: bool,
pub is_approved: bool,
pub qq_number: String,
pub wechat: String,
pub mail: String,
pub created_at: DateTimeWithTimeZone,
pub updated_at: DateTimeWithTimeZone,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::health_records::Entity")]
HealthRecords,
}
impl Related<super::health_records::Entity> for Entity {
fn to() -> RelationDef {
Relation::HealthRecords.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
@@ -0,0 +1,351 @@
pub mod cleanup;
pub mod entity;
pub mod operations;
use std::fmt;
use sea_orm::{
prelude::*, sea_query::OnConflict, ColumnTrait as _, DatabaseConnection, DbErr, EntityTrait,
QueryFilter as _, Set, SqlxSqliteConnector, Statement, TransactionTrait as _,
};
use sea_orm_migration::MigratorTrait as _;
use serde::{Deserialize, Serialize};
use sqlx::{migrate::MigrateDatabase as _, Sqlite, SqlitePool};
use crate::migrator;
#[derive(Debug, Clone)]
pub struct Db {
db_path: String,
db: SqlitePool,
orm_db: DatabaseConnection,
}
impl Db {
pub async fn new<T: ToString>(db_path: T) -> anyhow::Result<Self> {
let db = Self::prepare_db(db_path.to_string().as_str()).await?;
let orm_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(db.clone());
// 运行数据库迁移
migrator::Migrator::up(&orm_db, None).await?;
// 优化 SQLite 性能
Self::optimize_sqlite(&orm_db).await?;
Ok(Self {
db_path: db_path.to_string(),
db,
orm_db,
})
}
pub async fn memory_db() -> Self {
Self::new(":memory:").await.unwrap()
}
#[tracing::instrument(ret)]
async fn prepare_db(db_path: &str) -> anyhow::Result<SqlitePool> {
if !Sqlite::database_exists(db_path).await.unwrap_or(false) {
tracing::info!("Database not found, creating a new one");
Sqlite::create_database(db_path).await?;
}
let db = sqlx::pool::PoolOptions::new()
.max_lifetime(None)
.idle_timeout(None)
.connect(db_path)
.await?;
Ok(db)
}
async fn optimize_sqlite(db: &DatabaseConnection) -> Result<(), DbErr> {
// 优化 SQLite 性能
let pragmas = vec![
"PRAGMA journal_mode = WAL", // 使用 WAL 模式提高并发性能
"PRAGMA synchronous = NORMAL", // 平衡性能和数据安全
"PRAGMA cache_size = 10000", // 增加缓存大小
"PRAGMA temp_store = memory", // 临时存储使用内存
"PRAGMA mmap_size = 268435456", // 内存映射大小 256MB
"PRAGMA foreign_keys = ON", // 启用外键约束
];
for pragma in pragmas {
db.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
pragma.to_string(),
))
.await?;
}
Ok(())
}
pub fn inner(&self) -> SqlitePool {
self.db.clone()
}
pub fn orm_db(&self) -> &DatabaseConnection {
&self.orm_db
}
/// 清理旧的健康度记录(删除30天前的记录)
pub async fn cleanup_old_health_records(&self) -> Result<u64, DbErr> {
use chrono::Duration;
use entity::health_records;
let cutoff_date = chrono::Utc::now().naive_utc() - Duration::days(30);
let result = health_records::Entity::delete_many()
.filter(health_records::Column::CheckedAt.lt(cutoff_date))
.exec(self.orm_db())
.await?;
Ok(result.rows_affected)
}
/// 获取数据库统计信息
pub async fn get_database_stats(&self) -> anyhow::Result<DatabaseStats> {
use entity::{health_records, shared_nodes};
let node_count = shared_nodes::Entity::find().count(self.orm_db()).await?;
let health_record_count = health_records::Entity::find().count(self.orm_db()).await?;
let active_nodes_count = shared_nodes::Entity::find()
.filter(shared_nodes::Column::IsActive.eq(true))
.count(self.orm_db())
.await?;
Ok(DatabaseStats {
total_nodes: node_count,
active_nodes: active_nodes_count,
total_health_records: health_record_count,
})
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DatabaseStats {
pub total_nodes: u64,
pub active_nodes: u64,
pub total_health_records: u64,
}
/// 健康状态枚举
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum HealthStatus {
/// 健康状态
Healthy,
/// 不健康状态
Unhealthy,
/// 超时状态
Timeout,
/// 连接错误
ConnectionError,
/// 未知错误
Unknown,
}
impl fmt::Display for HealthStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
HealthStatus::Healthy => write!(f, "healthy"),
HealthStatus::Unhealthy => write!(f, "unhealthy"),
HealthStatus::Timeout => write!(f, "timeout"),
HealthStatus::ConnectionError => write!(f, "connection_error"),
HealthStatus::Unknown => write!(f, "unknown"),
}
}
}
impl From<String> for HealthStatus {
fn from(s: String) -> Self {
match s.to_lowercase().as_str() {
"healthy" => HealthStatus::Healthy,
"unhealthy" => HealthStatus::Unhealthy,
"timeout" => HealthStatus::Timeout,
"connection_error" => HealthStatus::ConnectionError,
_ => HealthStatus::Unknown,
}
}
}
impl From<&str> for HealthStatus {
fn from(s: &str) -> Self {
HealthStatus::from(s.to_string())
}
}
/// 健康统计信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStats {
/// 总检查次数
pub total_checks: u64,
/// 健康检查次数
pub healthy_count: u64,
/// 不健康检查次数
pub unhealthy_count: u64,
/// 健康百分比
pub health_percentage: f64,
/// 平均响应时间(毫秒)
pub average_response_time: Option<f64>,
/// 正常运行时间百分比
pub uptime_percentage: f64,
/// 最后检查时间
pub last_check_time: Option<chrono::DateTime<chrono::Utc>>,
/// 最后健康状态
pub last_status: Option<HealthStatus>,
}
impl Default for HealthStats {
fn default() -> Self {
Self {
total_checks: 0,
healthy_count: 0,
unhealthy_count: 0,
health_percentage: 0.0,
average_response_time: None,
uptime_percentage: 0.0,
last_check_time: None,
last_status: None,
}
}
}
impl HealthStats {
/// 从健康记录列表创建统计信息
pub fn from_records(records: &[self::entity::health_records::Model]) -> Self {
if records.is_empty() {
return Self::default();
}
let total_checks = records.len() as u64;
let healthy_count = records.iter().filter(|r| r.is_healthy()).count() as u64;
let unhealthy_count = total_checks - healthy_count;
let health_percentage = if total_checks > 0 {
(healthy_count as f64 / total_checks as f64) * 100.0
} else {
0.0
};
// 计算平均响应时间(只计算健康状态的记录)
let healthy_records: Vec<_> = records
.iter()
.filter(|r| r.is_healthy() && r.response_time > 0)
.collect();
let average_response_time = if !healthy_records.is_empty() {
let total_time: i32 = healthy_records.iter().map(|r| r.response_time).sum();
Some(total_time as f64 / healthy_records.len() as f64)
} else {
None
};
// 正常运行时间百分比(基于健康状态)
let uptime_percentage = health_percentage;
// 获取最后的检查信息
let last_record = records.first(); // records 应该按时间倒序排列
let last_check_time = last_record.map(|r| r.checked_at.into());
let last_status = last_record.map(|r| HealthStatus::from(r.status.clone()));
Self {
total_checks,
healthy_count,
unhealthy_count,
health_percentage,
average_response_time,
uptime_percentage,
last_check_time,
last_status,
}
}
}
/// Model 的扩展方法
impl entity::health_records::Model {
/// 检查记录是否为健康状态
pub fn is_healthy(&self) -> bool {
let status = HealthStatus::from(self.status.clone());
matches!(status, HealthStatus::Healthy)
}
/// 创建新的活动模型
pub fn new_active_model(
node_id: i32,
status: HealthStatus,
response_time: Option<i32>,
error_message: Option<String>,
) -> entity::health_records::ActiveModel {
entity::health_records::ActiveModel {
node_id: Set(node_id),
status: Set(status.to_string()),
response_time: Set(response_time.unwrap_or(0)),
error_message: Set(error_message.unwrap_or_default()),
checked_at: Set(chrono::Utc::now().fixed_offset()),
..Default::default()
}
}
/// 获取健康状态
pub fn get_status(&self) -> HealthStatus {
HealthStatus::from(self.status.clone())
}
}
/// Model 的扩展方法
impl entity::shared_nodes::Model {
/// 创建新的活动模型
#[allow(clippy::too_many_arguments)]
pub fn new_active_model(
name: String,
host: String,
port: i32,
protocol: String,
version: Option<String>,
description: Option<String>,
max_connections: i32,
allow_relay: bool,
network_name: String,
network_secret: Option<String>,
) -> entity::shared_nodes::ActiveModel {
let now = chrono::Utc::now().fixed_offset();
entity::shared_nodes::ActiveModel {
name: Set(name),
host: Set(host),
port: Set(port),
protocol: Set(protocol),
version: Set(version.unwrap_or_default()),
description: Set(description.unwrap_or_default()),
max_connections: Set(max_connections),
current_connections: Set(0),
is_active: Set(true),
is_approved: Set(false),
allow_relay: Set(allow_relay),
network_name: Set(network_name),
network_secret: Set(network_secret.unwrap_or_default()),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter as _};
#[tokio::test]
async fn test_database_creation() {
let db = Db::memory_db().await;
let stats = db.get_database_stats().await.unwrap();
// 初始状态下应该没有记录
assert_eq!(stats.total_nodes, 0);
assert_eq!(stats.active_nodes, 0);
assert_eq!(stats.total_health_records, 0);
}
}
@@ -0,0 +1,343 @@
use crate::api::CreateNodeRequest;
use crate::db::entity::*;
use crate::db::Db;
use crate::db::HealthStats;
use crate::db::HealthStatus;
use sea_orm::*;
/// 节点管理操作
pub struct NodeOperations;
impl NodeOperations {
pub fn create_node_model(req: CreateNodeRequest) -> shared_nodes::ActiveModel {
shared_nodes::ActiveModel {
id: NotSet,
name: Set(req.name),
host: Set(req.host),
port: Set(req.port),
protocol: Set(req.protocol),
version: Set("".to_string()),
description: Set(req.description.unwrap_or_default()),
max_connections: Set(req.max_connections),
current_connections: Set(0),
is_active: Set(false),
is_approved: Set(false),
allow_relay: Set(req.allow_relay),
network_name: Set(req.network_name),
network_secret: Set(req.network_secret.unwrap_or_default()),
qq_number: Set(req.qq_number.unwrap_or_default()),
wechat: Set(req.wechat.unwrap_or_default()),
mail: Set(req.mail.unwrap_or_default()),
created_at: Set(chrono::Utc::now().fixed_offset()),
updated_at: Set(chrono::Utc::now().fixed_offset()),
}
}
/// 创建新节点
pub async fn create_node(
db: &Db,
req: CreateNodeRequest,
) -> Result<shared_nodes::Model, DbErr> {
let node = Self::create_node_model(req);
let insert_result = shared_nodes::Entity::insert(node).exec(db.orm_db()).await?;
shared_nodes::Entity::find_by_id(insert_result.last_insert_id)
.one(db.orm_db())
.await?
.ok_or(DbErr::RecordNotFound(
"Failed to retrieve created node".to_string(),
))
}
/// 获取所有节点
pub async fn get_all_nodes(db: &Db) -> Result<Vec<shared_nodes::Model>, DbErr> {
shared_nodes::Entity::find()
.order_by_asc(shared_nodes::Column::Id)
.all(db.orm_db())
.await
}
/// 根据ID获取节点
pub async fn get_node_by_id(db: &Db, id: i32) -> Result<Option<shared_nodes::Model>, DbErr> {
shared_nodes::Entity::find_by_id(id).one(db.orm_db()).await
}
/// 更新节点状态
pub async fn update_node_status(
db: &Db,
id: i32,
is_active: bool,
current_connections: Option<i32>,
) -> Result<shared_nodes::Model, DbErr> {
let mut node = shared_nodes::Entity::find_by_id(id)
.one(db.orm_db())
.await?
.ok_or(DbErr::RecordNotFound("Node not found".to_string()))?;
let mut node = node.into_active_model();
node.is_active = Set(is_active);
if let Some(connections) = current_connections {
node.current_connections = Set(connections);
}
node.updated_at = Set(chrono::Utc::now().fixed_offset());
let updated_node = shared_nodes::Entity::update(node).exec(db.orm_db()).await?;
Ok(updated_node)
}
/// 删除节点
pub async fn delete_node(db: &Db, id: i32) -> Result<u64, DbErr> {
let result = shared_nodes::Entity::delete_by_id(id)
.exec(db.orm_db())
.await?;
Ok(result.rows_affected)
}
/// 获取活跃节点
pub async fn get_active_nodes(db: &Db) -> Result<Vec<shared_nodes::Model>, DbErr> {
shared_nodes::Entity::find()
.filter(shared_nodes::Column::IsActive.eq(true))
.order_by_asc(shared_nodes::Column::Id)
.all(db.orm_db())
.await
}
/// 检查节点是否存在(根据host、port、protocol
pub async fn node_exists(
db: &Db,
host: &str,
port: i32,
protocol: &str,
) -> Result<bool, DbErr> {
let count = shared_nodes::Entity::find()
.filter(shared_nodes::Column::Host.eq(host))
.filter(shared_nodes::Column::Port.eq(port))
.filter(shared_nodes::Column::Protocol.eq(protocol))
.count(db.orm_db())
.await?;
Ok(count > 0)
}
pub async fn update_node_version(
db: &Db,
node_id: i32,
version: String,
) -> Result<shared_nodes::Model, DbErr> {
let mut node = shared_nodes::Entity::find_by_id(node_id)
.one(db.orm_db())
.await?
.ok_or(DbErr::RecordNotFound("Node not found".to_string()))?;
let mut node = node.into_active_model();
node.version = Set(version);
node.updated_at = Set(chrono::Utc::now().fixed_offset());
let updated_node = shared_nodes::Entity::update(node).exec(db.orm_db()).await?;
Ok(updated_node)
}
}
/// 健康记录操作
pub struct HealthOperations;
impl HealthOperations {
/// 创建健康记录
pub async fn create_health_record(
db: &Db,
node_id: i32,
status: HealthStatus,
response_time: Option<i32>,
error_message: Option<String>,
) -> Result<health_records::Model, DbErr> {
let record =
health_records::Model::new_active_model(node_id, status, response_time, error_message);
let insert_result = health_records::Entity::insert(record)
.exec(db.orm_db())
.await?;
health_records::Entity::find_by_id(insert_result.last_insert_id)
.one(db.orm_db())
.await?
.ok_or(DbErr::RecordNotFound(
"Failed to retrieve created health record".to_string(),
))
}
/// 获取节点的健康记录
pub async fn get_node_health_records(
db: &Db,
node_id: i32,
from_date: Option<chrono::NaiveDateTime>,
limit: Option<u64>,
) -> Result<Vec<health_records::Model>, DbErr> {
let mut query = health_records::Entity::find()
.filter(health_records::Column::NodeId.eq(node_id))
.order_by_desc(health_records::Column::CheckedAt);
if let Some(from_date) = from_date {
query = query.filter(health_records::Column::CheckedAt.gte(from_date));
}
if let Some(limit) = limit {
query = query.limit(Some(limit));
}
query.all(db.orm_db()).await
}
/// 获取节点最近的健康状态
pub async fn get_latest_health_status(
db: &Db,
node_id: i32,
) -> Result<Option<health_records::Model>, DbErr> {
health_records::Entity::find()
.filter(health_records::Column::NodeId.eq(node_id))
.order_by_desc(health_records::Column::CheckedAt)
.one(db.orm_db())
.await
}
/// 获取健康统计信息
pub async fn get_health_stats(db: &Db, node_id: i32, hours: i64) -> Result<HealthStats, DbErr> {
let since = chrono::Utc::now().naive_utc() - chrono::Duration::hours(hours);
let records = health_records::Entity::find()
.filter(health_records::Column::NodeId.eq(node_id))
.filter(health_records::Column::CheckedAt.gte(since))
.order_by_desc(health_records::Column::CheckedAt)
.all(db.orm_db())
.await?;
Ok(HealthStats::from_records(&records))
}
/// 清理旧的健康记录
pub async fn cleanup_old_records(db: &Db, days: i64) -> Result<u64, DbErr> {
let cutoff = chrono::Utc::now().naive_utc() - chrono::Duration::days(days);
let result = health_records::Entity::delete_many()
.filter(health_records::Column::CheckedAt.lt(cutoff))
.exec(db.orm_db())
.await?;
Ok(result.rows_affected)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Db;
#[tokio::test]
async fn test_node_operations() {
let db = Db::memory_db().await;
let req = CreateNodeRequest {
name: "Test Node".to_string(),
host: "test.example.com".to_string(),
port: 11010,
protocol: "tcp".to_string(),
description: Some("Test node".to_string()),
max_connections: 100,
allow_relay: false,
network_name: "test-network".to_string(),
network_secret: Some("test-secret".to_string()),
qq_number: Some("123456789".to_string()),
wechat: Some("test_wechat".to_string()),
mail: Some("test@example.com".to_string()),
};
// 测试创建节点
let node = NodeOperations::create_node(&db, req).await.unwrap();
assert_eq!(node.name, "Test Node");
assert_eq!(node.host, "test.example.com");
assert_eq!(node.port, 11010);
assert!(node.is_active);
// 测试获取节点
let found_node = NodeOperations::get_node_by_id(&db, node.id).await.unwrap();
assert!(found_node.is_some());
assert_eq!(found_node.unwrap().id, node.id);
// 测试获取所有节点
let all_nodes = NodeOperations::get_all_nodes(&db).await.unwrap();
assert_eq!(all_nodes.len(), 1);
// 测试节点存在性检查
let exists = NodeOperations::node_exists(&db, "test.example.com", 11010, "tcp")
.await
.unwrap();
assert!(exists);
let not_exists = NodeOperations::node_exists(&db, "nonexistent.com", 8080, "tcp")
.await
.unwrap();
assert!(!not_exists);
}
#[tokio::test]
async fn test_health_operations() {
let db = Db::memory_db().await;
let req = CreateNodeRequest {
name: "Test Node".to_string(),
host: "test.example.com".to_string(),
port: 11010,
protocol: "tcp".to_string(),
description: Some("Test node".to_string()),
max_connections: 100,
allow_relay: false,
network_name: "test-network".to_string(),
network_secret: Some("test-secret".to_string()),
qq_number: Some("123456789".to_string()),
wechat: Some("test_wechat".to_string()),
mail: Some("test@example.com".to_string()),
};
// 创建测试节点
let node = NodeOperations::create_node(&db, req).await.unwrap();
// 测试创建健康记录
let record = HealthOperations::create_health_record(
&db,
node.id,
HealthStatus::Healthy,
Some(100),
None,
)
.await
.unwrap();
assert_eq!(record.node_id, node.id);
assert!(record.is_healthy());
assert_eq!(record.response_time, 100);
// 测试获取健康记录
let records = HealthOperations::get_node_health_records(&db, node.id, None, None)
.await
.unwrap();
assert_eq!(records.len(), 1);
// 测试获取最新状态
let latest = HealthOperations::get_latest_health_status(&db, node.id)
.await
.unwrap();
assert!(latest.is_some());
assert_eq!(latest.unwrap().id, record.id);
// 测试健康统计
let stats = HealthOperations::get_health_stats(&db, node.id, 24)
.await
.unwrap();
assert_eq!(stats.total_checks, 1);
assert_eq!(stats.healthy_count, 1);
assert_eq!(stats.health_percentage, 100.0);
}
}