fix memory leak

This commit is contained in:
sijie.sun
2025-03-05 17:53:19 +08:00
committed by Sijie.Sun
parent 673c34cf5a
commit 568dca6f9c
5 changed files with 45 additions and 31 deletions
+19 -12
View File
@@ -4,7 +4,7 @@ use std::{
io::Write as _, io::Write as _,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tokio::task::JoinSet; use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument; use tracing::Instrument;
pub mod compressor; pub mod compressor;
@@ -47,16 +47,13 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
origin: String, origin: String,
) { ) {
let js = Arc::downgrade(&js); let js = Arc::downgrade(&js);
let o = origin.clone();
tokio::spawn( tokio::spawn(
async move { async move {
loop { while js.strong_count() > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if js.weak_count() == 0 {
tracing::info!("joinset task exit");
break;
}
future::poll_fn(|cx| { let fut = future::poll_fn(|cx| {
let Some(js) = js.upgrade() else { let Some(js) = js.upgrade() else {
return std::task::Poll::Ready(()); return std::task::Poll::Ready(());
}; };
@@ -64,15 +61,24 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
let mut js = js.lock().unwrap(); let mut js = js.lock().unwrap();
while !js.is_empty() { while !js.is_empty() {
let ret = js.poll_join_next(cx); let ret = js.poll_join_next(cx);
if ret.is_pending() { match ret {
std::task::Poll::Ready(Some(_)) => {
continue;
}
std::task::Poll::Ready(None) => {
break;
}
std::task::Poll::Pending => {
return std::task::Poll::Pending; return std::task::Poll::Pending;
} }
} }
std::task::Poll::Ready(())
})
.await;
} }
std::task::Poll::Ready(())
});
let _ = timeout(std::time::Duration::from_secs(5), fut).await;
}
tracing::debug!(?o, "joinset task exit");
} }
.instrument(tracing::info_span!( .instrument(tracing::info_span!(
"join_joinset_background", "join_joinset_background",
@@ -167,5 +173,6 @@ mod tests {
drop(js); drop(js);
tokio::time::sleep(std::time::Duration::from_secs(2)).await; tokio::time::sleep(std::time::Duration::from_secs(2)).await;
assert_eq!(weak_js.weak_count(), 0); assert_eq!(weak_js.weak_count(), 0);
assert_eq!(weak_js.strong_count(), 0);
} }
} }
+16 -4
View File
@@ -24,6 +24,7 @@ use crate::{
config::{ConfigLoader, TomlConfigLoader}, config::{ConfigLoader, TomlConfigLoader},
error::Error, error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity}, global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity},
join_joinset_background,
stun::MockStunInfoCollector, stun::MockStunInfoCollector,
PeerId, PeerId,
}, },
@@ -181,6 +182,15 @@ impl ForeignNetworkEntry {
} }
} }
impl Drop for RpcTransport {
fn drop(&mut self) {
tracing::debug!(
"drop rpc transport for foreign network manager, my_peer_id: {:?}",
self.my_peer_id
);
}
}
let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let tspt = RpcTransport { let tspt = RpcTransport {
my_peer_id, my_peer_id,
@@ -216,7 +226,6 @@ impl ForeignNetworkEntry {
.list_global_foreign_peer(&self.network_identity) .list_global_foreign_peer(&self.network_identity)
.await; .await;
let local = peer_map.list_peers_with_conn().await; let local = peer_map.list_peers_with_conn().await;
tracing::debug!(?global, ?local, ?self.my_peer_id, "list peers in foreign network manager");
global.extend(local.iter().cloned()); global.extend(local.iter().cloned());
global global
.into_iter() .into_iter()
@@ -426,7 +435,7 @@ pub struct ForeignNetworkManager {
data: Arc<ForeignNetworkManagerData>, data: Arc<ForeignNetworkManagerData>,
tasks: Mutex<JoinSet<()>>, tasks: Arc<std::sync::Mutex<JoinSet<()>>>,
} }
impl ForeignNetworkManager { impl ForeignNetworkManager {
@@ -444,6 +453,9 @@ impl ForeignNetworkManager {
lock: std::sync::Mutex::new(()), lock: std::sync::Mutex::new(()),
}); });
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
join_joinset_background(tasks.clone(), "ForeignNetworkManager".to_string());
Self { Self {
my_peer_id, my_peer_id,
global_ctx, global_ctx,
@@ -451,7 +463,7 @@ impl ForeignNetworkManager {
data, data,
tasks: Mutex::new(JoinSet::new()), tasks,
} }
} }
@@ -503,7 +515,7 @@ impl ForeignNetworkManager {
let data = self.data.clone(); let data = self.data.clone();
let network_name = entry.network.network_name.clone(); let network_name = entry.network.network_name.clone();
let mut s = entry.global_ctx.subscribe(); let mut s = entry.global_ctx.subscribe();
self.tasks.lock().await.spawn(async move { self.tasks.lock().unwrap().spawn(async move {
while let Ok(e) = s.recv().await { while let Ok(e) = s.recv().await {
match &e { match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => { GlobalCtxEvent::PeerRemoved(peer_id) => {
+7 -2
View File
@@ -84,9 +84,7 @@ impl PingIntervalController {
self.throughput.rx_packets() > self.last_throughput.rx_packets() self.throughput.rx_packets() > self.last_throughput.rx_packets()
} }
#[tracing::instrument]
fn should_send_ping(&mut self) -> bool { fn should_send_ping(&mut self) -> bool {
tracing::trace!(?self, "check should_send_ping");
if self.loss_counter.load(Ordering::Relaxed) > 0 { if self.loss_counter.load(Ordering::Relaxed) > 0 {
self.backoff_idx = 0; self.backoff_idx = 0;
} else if self.tx_increase() && !self.rx_increase() { } else if self.tx_increase() && !self.rx_increase() {
@@ -253,6 +251,13 @@ impl PeerConnPinger {
continue; continue;
} }
tracing::debug!(
"pingpong controller send pingpong task, seq: {}, node_id: {}, controller: {:?}",
req_seq,
my_node_id,
controller
);
let mut sink = sink.clone(); let mut sink = sink.clone();
let receiver = ctrl_resp_sender.subscribe(); let receiver = ctrl_resp_sender.subscribe();
let ping_res_sender = ping_res_sender.clone(); let ping_res_sender = ping_res_sender.clone();
+3 -6
View File
@@ -1450,9 +1450,6 @@ impl PeerRouteServiceImpl {
let my_peer_id = self.my_peer_id; let my_peer_id = self.my_peer_id;
let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session); let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session);
tracing::trace!(?foreign_network, "building sync_route request. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session);
if peer_infos.is_none() if peer_infos.is_none()
&& conn_bitmap.is_none() && conn_bitmap.is_none()
&& foreign_network.is_none() && foreign_network.is_none()
@@ -1462,6 +1459,9 @@ impl PeerRouteServiceImpl {
return true; return true;
} }
tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session);
session session
.need_sync_initiator_info .need_sync_initiator_info
.store(false, Ordering::Relaxed); .store(false, Ordering::Relaxed);
@@ -1728,7 +1728,6 @@ impl RouteSessionManager {
Ok(session) Ok(session)
} }
#[tracing::instrument(skip(self))]
async fn maintain_sessions(&self, service_impl: Arc<PeerRouteServiceImpl>) -> bool { async fn maintain_sessions(&self, service_impl: Arc<PeerRouteServiceImpl>) -> bool {
let mut cur_dst_peer_id_to_initiate = None; let mut cur_dst_peer_id_to_initiate = None;
let mut next_sleep_ms = 0; let mut next_sleep_ms = 0;
@@ -1764,8 +1763,6 @@ impl RouteSessionManager {
.map(|x| *x) .map(|x| *x)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
tracing::trace!(?service_impl.my_peer_id, ?peers, ?session_peers, ?initiator_candidates, "maintain_sessions begin");
if initiator_candidates.is_empty() { if initiator_candidates.is_empty() {
next_sleep_ms = 1000; next_sleep_ms = 1000;
continue; continue;
-7
View File
@@ -83,12 +83,6 @@ where
} }
} }
tracing::debug!(
?peers_to_connect,
?to_remove,
"got peers to connect and remove"
);
for key in to_remove { for key in to_remove {
if let Some((_, task)) = peer_task_map.remove(&key) { if let Some((_, task)) = peer_task_map.remove(&key) {
task.abort(); task.abort();
@@ -115,7 +109,6 @@ where
.insert(item.clone(), launcher.launch_task(&data, item).await.into()); .insert(item.clone(), launcher.launch_task(&data, item).await.into());
} }
} else if peer_task_map.is_empty() { } else if peer_task_map.is_empty() {
tracing::debug!("all task done");
launcher.all_task_done(&data).await; launcher.all_task_done(&data).await;
} }