mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 10:14:35 +00:00
use peer center instance to gatter peers info (#21)
* use peer center instance to gatter peers info
This commit is contained in:
@@ -9,20 +9,27 @@ use std::{
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
use tokio::{sync::Mutex, task::JoinSet};
|
||||
use tokio::{
|
||||
sync::{Mutex, RwLock},
|
||||
task::JoinSet,
|
||||
};
|
||||
use tracing::Instrument;
|
||||
|
||||
use crate::peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId};
|
||||
use crate::{
|
||||
peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId},
|
||||
rpc::{GetGlobalPeerMapRequest, GetGlobalPeerMapResponse},
|
||||
};
|
||||
|
||||
use super::{
|
||||
server::PeerCenterServer,
|
||||
service::{PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap},
|
||||
service::{GlobalPeerMap, PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap},
|
||||
Digest, Error,
|
||||
};
|
||||
|
||||
pub struct PeerCenterClient {
|
||||
struct PeerCenterBase {
|
||||
peer_mgr: Arc<PeerManager>,
|
||||
tasks: Arc<Mutex<JoinSet<()>>>,
|
||||
lock: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
static SERVICE_ID: u32 = 5;
|
||||
@@ -32,7 +39,7 @@ struct PeridicJobCtx<T> {
|
||||
job_ctx: T,
|
||||
}
|
||||
|
||||
impl PeerCenterClient {
|
||||
impl PeerCenterBase {
|
||||
pub async fn init(&self) -> Result<(), Error> {
|
||||
self.peer_mgr.get_peer_rpc_mgr().run_service(
|
||||
SERVICE_ID,
|
||||
@@ -67,21 +74,22 @@ impl PeerCenterClient {
|
||||
) -> () {
|
||||
let my_node_id = self.peer_mgr.my_node_id();
|
||||
let peer_mgr = self.peer_mgr.clone();
|
||||
let lock = self.lock.clone();
|
||||
self.tasks.lock().await.spawn(
|
||||
async move {
|
||||
let ctx = Arc::new(PeridicJobCtx {
|
||||
peer_mgr: peer_mgr.clone(),
|
||||
job_ctx,
|
||||
});
|
||||
tracing::warn!(?my_node_id, "before periodic job loop");
|
||||
loop {
|
||||
let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else {
|
||||
tracing::warn!("no center peer found, sleep 1 second");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
};
|
||||
tracing::warn!(?center_peer, "run periodic job");
|
||||
tracing::info!(?center_peer, "run periodic job");
|
||||
let rpc_mgr = peer_mgr.get_peer_rpc_mgr();
|
||||
let _g = lock.lock().await;
|
||||
let ret = rpc_mgr
|
||||
.do_client_rpc_scoped(SERVICE_ID, center_peer, |c| async {
|
||||
let client =
|
||||
@@ -90,6 +98,7 @@ impl PeerCenterClient {
|
||||
job_fn(client, ctx.clone()).await
|
||||
})
|
||||
.await;
|
||||
drop(_g);
|
||||
|
||||
let Ok(sleep_time_ms) = ret else {
|
||||
tracing::error!("periodic job to center server rpc failed: {:?}", ret);
|
||||
@@ -106,35 +115,85 @@ impl PeerCenterClient {
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn new(peer_mgr: Arc<PeerManager>) -> Self {
|
||||
PeerCenterClient {
|
||||
pub fn new(peer_mgr: Arc<PeerManager>) -> Self {
|
||||
PeerCenterBase {
|
||||
peer_mgr,
|
||||
tasks: Arc::new(Mutex::new(JoinSet::new())),
|
||||
lock: Arc::new(Mutex::new(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PeerCenterInstance {
|
||||
pub struct PeerCenterInstanceService {
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl crate::rpc::cli::peer_center_rpc_server::PeerCenterRpc for PeerCenterInstanceService {
|
||||
async fn get_global_peer_map(
|
||||
&self,
|
||||
_request: tonic::Request<GetGlobalPeerMapRequest>,
|
||||
) -> Result<tonic::Response<GetGlobalPeerMapResponse>, tonic::Status> {
|
||||
let global_peer_map = self.global_peer_map.read().await.clone();
|
||||
Ok(tonic::Response::new(GetGlobalPeerMapResponse {
|
||||
global_peer_map: global_peer_map
|
||||
.map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_string(), v))
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PeerCenterInstance {
|
||||
peer_mgr: Arc<PeerManager>,
|
||||
client: Arc<PeerCenterClient>,
|
||||
|
||||
client: Arc<PeerCenterBase>,
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
}
|
||||
|
||||
impl PeerCenterInstance {
|
||||
pub async fn new(peer_mgr: Arc<PeerManager>) -> Self {
|
||||
let client = Arc::new(PeerCenterClient::new(peer_mgr.clone()).await);
|
||||
client.init().await.unwrap();
|
||||
pub fn new(peer_mgr: Arc<PeerManager>) -> Self {
|
||||
PeerCenterInstance {
|
||||
peer_mgr: peer_mgr.clone(),
|
||||
client: Arc::new(PeerCenterBase::new(peer_mgr.clone())),
|
||||
global_peer_map: Arc::new(RwLock::new(GlobalPeerMap::new())),
|
||||
global_peer_map_digest: Arc::new(RwLock::new(Digest::default())),
|
||||
}
|
||||
}
|
||||
|
||||
PeerCenterInstance { peer_mgr, client }
|
||||
pub async fn init(&self) {
|
||||
self.client.init().await.unwrap();
|
||||
self.init_get_global_info_job().await;
|
||||
self.init_report_peers_job().await;
|
||||
}
|
||||
|
||||
async fn init_get_global_info_job(&self) {
|
||||
struct Ctx {
|
||||
global_peer_map: Arc<RwLock<GlobalPeerMap>>,
|
||||
global_peer_map_digest: Arc<RwLock<Digest>>,
|
||||
}
|
||||
|
||||
let ctx = Arc::new(Ctx {
|
||||
global_peer_map: self.global_peer_map.clone(),
|
||||
global_peer_map_digest: self.global_peer_map_digest.clone(),
|
||||
});
|
||||
|
||||
self.client
|
||||
.init_periodic_job({}, |client, _ctx| async move {
|
||||
.init_periodic_job(ctx, |client, ctx| async move {
|
||||
let mut rpc_ctx = tarpc::context::current();
|
||||
rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3);
|
||||
|
||||
let ret = client
|
||||
.get_global_peer_map(tarpc::context::current(), 0)
|
||||
.get_global_peer_map(
|
||||
rpc_ctx,
|
||||
ctx.job_ctx.global_peer_map_digest.read().await.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Ok(global_peer_map) = ret else {
|
||||
let Ok(resp) = ret else {
|
||||
tracing::error!(
|
||||
"get global info from center server got error result: {:?}",
|
||||
ret
|
||||
@@ -142,7 +201,18 @@ impl PeerCenterInstance {
|
||||
return Ok(1000);
|
||||
};
|
||||
|
||||
tracing::warn!("get global info from center server: {:?}", global_peer_map);
|
||||
let Some(resp) = resp else {
|
||||
return Ok(1000);
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"get global info from center server: {:?}, digest: {:?}",
|
||||
resp.global_peer_map,
|
||||
resp.digest
|
||||
);
|
||||
|
||||
*ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map;
|
||||
*ctx.job_ctx.global_peer_map_digest.write().await = resp.digest;
|
||||
|
||||
Ok(5000)
|
||||
})
|
||||
@@ -196,10 +266,19 @@ impl PeerCenterInstance {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn get_rpc_service(&self) -> PeerCenterInstanceService {
|
||||
PeerCenterInstanceService {
|
||||
global_peer_map: self.global_peer_map.clone(),
|
||||
global_peer_map_digest: self.global_peer_map_digest.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::{
|
||||
peer_center::server::get_global_data,
|
||||
peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
|
||||
@@ -213,13 +292,14 @@ mod tests {
|
||||
let peer_mgr_b = create_mock_peer_manager().await;
|
||||
let peer_mgr_c = create_mock_peer_manager().await;
|
||||
|
||||
let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone()).await;
|
||||
let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone()).await;
|
||||
let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone()).await;
|
||||
let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone());
|
||||
let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone());
|
||||
let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone());
|
||||
|
||||
peer_center_a.init_report_peers_job().await;
|
||||
peer_center_b.init_report_peers_job().await;
|
||||
peer_center_c.init_report_peers_job().await;
|
||||
let peer_centers = vec![&peer_center_a, &peer_center_b, &peer_center_c];
|
||||
for pc in peer_centers.iter() {
|
||||
pc.init().await;
|
||||
}
|
||||
|
||||
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
|
||||
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
|
||||
@@ -228,7 +308,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let center_peer = PeerCenterClient::select_center_peer(&peer_mgr_a)
|
||||
let center_peer = PeerCenterBase::select_center_peer(&peer_mgr_a)
|
||||
.await
|
||||
.unwrap();
|
||||
let center_data = get_global_data(center_peer);
|
||||
@@ -248,5 +328,29 @@ mod tests {
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
let mut digest = None;
|
||||
for pc in peer_centers.iter() {
|
||||
let rpc_service = pc.get_rpc_service();
|
||||
let now = std::time::Instant::now();
|
||||
while now.elapsed().as_secs() < 10 {
|
||||
if rpc_service.global_peer_map.read().await.map.len() == 3 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
assert_eq!(rpc_service.global_peer_map.read().await.map.len(), 3);
|
||||
println!("rpc service ready, {:#?}", rpc_service.global_peer_map);
|
||||
|
||||
if digest.is_none() {
|
||||
digest = Some(rpc_service.global_peer_map_digest.read().await.clone());
|
||||
} else {
|
||||
let v = rpc_service.global_peer_map_digest.read().await;
|
||||
assert_eq!(digest.as_ref().unwrap(), v.deref());
|
||||
}
|
||||
}
|
||||
|
||||
let global_digest = get_global_data(center_peer).read().await.digest.clone();
|
||||
assert_eq!(digest.as_ref().unwrap(), &global_digest);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user