mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-13 17:35:37 +00:00
fix peer center for latency report
This commit is contained in:
@@ -1,24 +1,23 @@
|
||||
use std::{
|
||||
collections::hash_map::DefaultHasher,
|
||||
hash::{Hash, Hasher},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, SystemTime},
|
||||
collections::BTreeSet,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
use crossbeam::atomic::AtomicCell;
|
||||
use futures::Future;
|
||||
use tokio::{
|
||||
sync::{Mutex, RwLock},
|
||||
task::JoinSet,
|
||||
};
|
||||
use std::sync::RwLock;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::Instrument;
|
||||
|
||||
use crate::{
|
||||
common::PeerId,
|
||||
peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService},
|
||||
peers::{
|
||||
peer_manager::PeerManager,
|
||||
route_trait::{RouteCostCalculator, RouteCostCalculatorInterface},
|
||||
rpc_service::PeerManagerRpcService,
|
||||
},
|
||||
rpc::{GetGlobalPeerMapRequest, GetGlobalPeerMapResponse},
|
||||
};
|
||||
|
||||
@@ -34,7 +33,8 @@ struct PeerCenterBase {
|
||||
lock: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
static SERVICE_ID: u32 = 5;
|
||||
// static SERVICE_ID: u32 = 5; for compatibility with the original code
|
||||
static SERVICE_ID: u32 = 50;
|
||||
|
||||
struct PeridicJobCtx<T> {
|
||||
peer_mgr: Arc<PeerManager>,
|
||||
@@ -132,7 +132,7 @@ impl PeerCenterBase {
|
||||
|
||||
pub struct PeerCenterInstanceService {
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
global_peer_map_digest: Arc<AtomicCell<Digest>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
@@ -141,7 +141,7 @@ impl crate::rpc::cli::peer_center_rpc_server::PeerCenterRpc for PeerCenterInstan
|
||||
&self,
|
||||
_request: tonic::Request<GetGlobalPeerMapRequest>,
|
||||
) -> Result<tonic::Response<GetGlobalPeerMapResponse>, tonic::Status> {
|
||||
let global_peer_map = self.global_peer_map.read().await.clone();
|
||||
let global_peer_map = self.global_peer_map.read().unwrap().clone();
|
||||
Ok(tonic::Response::new(GetGlobalPeerMapResponse {
|
||||
global_peer_map: global_peer_map
|
||||
.map
|
||||
@@ -157,7 +157,8 @@ pub struct PeerCenterInstance {
|
||||
|
||||
client: Arc<PeerCenterBase>,
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
global_peer_map_digest: Arc<AtomicCell<Digest>>,
|
||||
global_peer_map_update_time: Arc<AtomicCell<Instant>>,
|
||||
}
|
||||
|
||||
impl PeerCenterInstance {
|
||||
@@ -166,7 +167,8 @@ impl PeerCenterInstance {
|
||||
peer_mgr: peer_mgr.clone(),
|
||||
client: Arc::new(PeerCenterBase::new(peer_mgr.clone())),
|
||||
global_peer_map: Arc::new(RwLock::new(GlobalPeerMap::new())),
|
||||
global_peer_map_digest: Arc::new(RwLock::new(Digest::default())),
|
||||
global_peer_map_digest: Arc::new(AtomicCell::new(Digest::default())),
|
||||
global_peer_map_update_time: Arc::new(AtomicCell::new(Instant::now())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,12 +181,14 @@ impl PeerCenterInstance {
|
||||
async fn init_get_global_info_job(&self) {
|
||||
struct Ctx {
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
global_peer_map_digest: Arc<AtomicCell<Digest>>,
|
||||
global_peer_map_update_time: Arc<AtomicCell<Instant>>,
|
||||
}
|
||||
|
||||
let ctx = Arc::new(Ctx {
|
||||
global_peer_map: self.global_peer_map.clone(),
|
||||
global_peer_map_digest: self.global_peer_map_digest.clone(),
|
||||
global_peer_map_update_time: self.global_peer_map_update_time.clone(),
|
||||
});
|
||||
|
||||
self.client
|
||||
@@ -193,10 +197,7 @@ impl PeerCenterInstance {
|
||||
rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3);
|
||||
|
||||
let ret = client
|
||||
.get_global_peer_map(
|
||||
rpc_ctx,
|
||||
ctx.job_ctx.global_peer_map_digest.read().await.clone(),
|
||||
)
|
||||
.get_global_peer_map(rpc_ctx, ctx.job_ctx.global_peer_map_digest.load())
|
||||
.await?;
|
||||
|
||||
let Ok(resp) = ret else {
|
||||
@@ -217,10 +218,13 @@ impl PeerCenterInstance {
|
||||
resp.digest
|
||||
);
|
||||
|
||||
*ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map;
|
||||
*ctx.job_ctx.global_peer_map_digest.write().await = resp.digest;
|
||||
*ctx.job_ctx.global_peer_map.write().unwrap() = resp.global_peer_map;
|
||||
ctx.job_ctx.global_peer_map_digest.store(resp.digest);
|
||||
ctx.job_ctx
|
||||
.global_peer_map_update_time
|
||||
.store(Instant::now());
|
||||
|
||||
Ok(10000)
|
||||
Ok(5000)
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@@ -228,67 +232,53 @@ impl PeerCenterInstance {
|
||||
async fn init_report_peers_job(&self) {
|
||||
struct Ctx {
|
||||
service: PeerManagerRpcService,
|
||||
need_send_peers: AtomicBool,
|
||||
last_report_peers: Mutex<PeerInfoForGlobalMap>,
|
||||
|
||||
last_report_peers: Mutex<BTreeSet<PeerId>>,
|
||||
|
||||
last_center_peer: AtomicCell<PeerId>,
|
||||
last_report_time: AtomicCell<Instant>,
|
||||
}
|
||||
let ctx = Arc::new(Ctx {
|
||||
service: PeerManagerRpcService::new(self.peer_mgr.clone()),
|
||||
need_send_peers: AtomicBool::new(true),
|
||||
last_report_peers: Mutex::new(PeerInfoForGlobalMap::default()),
|
||||
last_report_peers: Mutex::new(BTreeSet::new()),
|
||||
last_center_peer: AtomicCell::new(PeerId::default()),
|
||||
last_report_time: AtomicCell::new(Instant::now()),
|
||||
});
|
||||
|
||||
self.client
|
||||
.init_periodic_job(ctx, |client, ctx| async move {
|
||||
let my_node_id = ctx.peer_mgr.my_peer_id();
|
||||
let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into();
|
||||
let peer_list = peers.direct_peers.keys().map(|k| *k).collect();
|
||||
let job_ctx = &ctx.job_ctx;
|
||||
|
||||
// if peers are not same in next 10 seconds, report peers to center server
|
||||
let mut peers = PeerInfoForGlobalMap::default();
|
||||
for _ in 1..10 {
|
||||
peers = ctx.job_ctx.service.list_peers().await.into();
|
||||
if ctx.center_peer.load() != ctx.job_ctx.last_center_peer.load() {
|
||||
// if center peer changed, report peers immediately
|
||||
break;
|
||||
}
|
||||
if peers == *ctx.job_ctx.last_report_peers.lock().await {
|
||||
return Ok(3000);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
// only report when:
|
||||
// 1. center peer changed
|
||||
// 2. last report time is more than 60 seconds
|
||||
// 3. peers changed
|
||||
if ctx.center_peer.load() == ctx.job_ctx.last_center_peer.load()
|
||||
&& job_ctx.last_report_time.load().elapsed().as_secs() < 60
|
||||
&& *job_ctx.last_report_peers.lock().await == peer_list
|
||||
{
|
||||
return Ok(5000);
|
||||
}
|
||||
|
||||
*ctx.job_ctx.last_report_peers.lock().await = peers.clone();
|
||||
let mut hasher = DefaultHasher::new();
|
||||
peers.hash(&mut hasher);
|
||||
|
||||
let peers = if ctx.job_ctx.need_send_peers.load(Ordering::Relaxed) {
|
||||
Some(peers)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut rpc_ctx = tarpc::context::current();
|
||||
rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3);
|
||||
|
||||
let ret = client
|
||||
.report_peers(
|
||||
rpc_ctx,
|
||||
my_node_id.clone(),
|
||||
peers,
|
||||
hasher.finish() as Digest,
|
||||
)
|
||||
.report_peers(rpc_ctx, my_node_id.clone(), peers)
|
||||
.await?;
|
||||
|
||||
if matches!(ret.as_ref().err(), Some(Error::DigestMismatch)) {
|
||||
ctx.job_ctx.need_send_peers.store(true, Ordering::Relaxed);
|
||||
return Ok(0);
|
||||
} else if ret.is_err() {
|
||||
if ret.is_ok() {
|
||||
ctx.job_ctx.last_center_peer.store(ctx.center_peer.load());
|
||||
*ctx.job_ctx.last_report_peers.lock().await = peer_list;
|
||||
ctx.job_ctx.last_report_time.store(Instant::now());
|
||||
} else {
|
||||
tracing::error!("report peers to center server got error result: {:?}", ret);
|
||||
return Ok(500);
|
||||
}
|
||||
|
||||
ctx.job_ctx.last_center_peer.store(ctx.center_peer.load());
|
||||
ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed);
|
||||
Ok(3000)
|
||||
Ok(5000)
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@@ -299,15 +289,62 @@ impl PeerCenterInstance {
|
||||
global_peer_map_digest: self.global_peer_map_digest.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_cost_calculator(&self) -> RouteCostCalculator {
|
||||
struct RouteCostCalculatorImpl {
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
|
||||
global_peer_map_clone: GlobalPeerMap,
|
||||
|
||||
last_update_time: AtomicCell<Instant>,
|
||||
global_peer_map_update_time: Arc<AtomicCell<Instant>>,
|
||||
}
|
||||
|
||||
impl RouteCostCalculatorInterface for RouteCostCalculatorImpl {
|
||||
fn calculate_cost(&self, src: PeerId, dst: PeerId) -> i32 {
|
||||
let ret = self
|
||||
.global_peer_map_clone
|
||||
.map
|
||||
.get(&src)
|
||||
.and_then(|src_peer_info| src_peer_info.direct_peers.get(&dst))
|
||||
.and_then(|info| Some(info.latency_ms));
|
||||
ret.unwrap_or(i32::MAX)
|
||||
}
|
||||
|
||||
fn begin_update(&mut self) {
|
||||
let global_peer_map = self.global_peer_map.read().unwrap();
|
||||
self.global_peer_map_clone = global_peer_map.clone();
|
||||
}
|
||||
|
||||
fn end_update(&mut self) {
|
||||
self.last_update_time
|
||||
.store(self.global_peer_map_update_time.load());
|
||||
}
|
||||
|
||||
fn need_update(&self) -> bool {
|
||||
self.last_update_time.load() < self.global_peer_map_update_time.load()
|
||||
}
|
||||
}
|
||||
|
||||
Box::new(RouteCostCalculatorImpl {
|
||||
global_peer_map: self.global_peer_map.clone(),
|
||||
global_peer_map_clone: GlobalPeerMap::new(),
|
||||
last_update_time: AtomicCell::new(
|
||||
self.global_peer_map_update_time.load() - Duration::from_secs(1),
|
||||
),
|
||||
global_peer_map_update_time: self.global_peer_map_update_time.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::{
|
||||
peer_center::server::get_global_data,
|
||||
peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
|
||||
peers::tests::{
|
||||
connect_peer_manager, create_mock_peer_manager, wait_for_condition, wait_route_appear,
|
||||
},
|
||||
tunnel::common::tests::enable_log,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -340,43 +377,64 @@ mod tests {
|
||||
let center_data = get_global_data(center_peer);
|
||||
|
||||
// wait center_data has 3 records for 10 seconds
|
||||
let now = std::time::Instant::now();
|
||||
loop {
|
||||
if center_data.read().await.global_peer_map.map.len() == 3 {
|
||||
println!(
|
||||
"center data ready, {:#?}",
|
||||
center_data.read().await.global_peer_map
|
||||
);
|
||||
break;
|
||||
}
|
||||
if now.elapsed().as_secs() > 60 {
|
||||
panic!("center data not ready");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
wait_for_condition(
|
||||
|| async {
|
||||
if center_data.global_peer_map.len() == 4 {
|
||||
println!("center data {:#?}", center_data.global_peer_map);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut digest = None;
|
||||
for pc in peer_centers.iter() {
|
||||
let rpc_service = pc.get_rpc_service();
|
||||
let now = std::time::Instant::now();
|
||||
while now.elapsed().as_secs() < 10 {
|
||||
if rpc_service.global_peer_map.read().await.map.len() == 3 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
assert_eq!(rpc_service.global_peer_map.read().await.map.len(), 3);
|
||||
wait_for_condition(
|
||||
|| async { rpc_service.global_peer_map.read().unwrap().map.len() == 3 },
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
|
||||
println!("rpc service ready, {:#?}", rpc_service.global_peer_map);
|
||||
|
||||
if digest.is_none() {
|
||||
digest = Some(rpc_service.global_peer_map_digest.read().await.clone());
|
||||
digest = Some(rpc_service.global_peer_map_digest.load());
|
||||
} else {
|
||||
let v = rpc_service.global_peer_map_digest.read().await;
|
||||
assert_eq!(digest.as_ref().unwrap(), v.deref());
|
||||
let v = rpc_service.global_peer_map_digest.load();
|
||||
assert_eq!(digest.unwrap(), v);
|
||||
}
|
||||
|
||||
let mut route_cost = pc.get_cost_calculator();
|
||||
assert!(route_cost.need_update());
|
||||
|
||||
route_cost.begin_update();
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_a.my_peer_id(), peer_mgr_b.my_peer_id()) < 30
|
||||
);
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_b.my_peer_id(), peer_mgr_a.my_peer_id()) < 30
|
||||
);
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_b.my_peer_id(), peer_mgr_c.my_peer_id()) < 30
|
||||
);
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_c.my_peer_id(), peer_mgr_b.my_peer_id()) < 30
|
||||
);
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_c.my_peer_id(), peer_mgr_a.my_peer_id()) > 10000
|
||||
);
|
||||
assert!(
|
||||
route_cost.calculate_cost(peer_mgr_a.my_peer_id(), peer_mgr_c.my_peer_id()) > 10000
|
||||
);
|
||||
route_cost.end_update();
|
||||
assert!(!route_cost.need_update());
|
||||
}
|
||||
|
||||
let global_digest = get_global_data(center_peer).read().await.digest.clone();
|
||||
let global_digest = get_global_data(center_peer).digest.load();
|
||||
assert_eq!(digest.as_ref().unwrap(), &global_digest);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user