fix wg client hang after some time (#297)

wg portal doesn't know client disconnect causing msg overstocked in queue, make
entire peer packet process pipeline hang.
This commit is contained in:
Sijie.Sun
2024-08-31 12:44:12 +08:00
committed by GitHub
parent 6964fb71fc
commit 2058dbc470
3 changed files with 43 additions and 9 deletions
+36 -5
View File
@@ -1,6 +1,7 @@
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use anyhow::Context;
@@ -9,7 +10,7 @@ use cidr::Ipv4Inet;
use dashmap::DashMap;
use futures::StreamExt;
use pnet::packet::ipv4::Ipv4Packet;
use tokio::task::JoinSet;
use tokio::{task::JoinSet, time::timeout};
use tracing::Level;
use crate::{
@@ -23,7 +24,7 @@ use crate::{
mpsc::{MpscTunnel, MpscTunnelSender},
packet_def::{PacketType, ZCPacket, ZCPacketType},
wireguard::{WgConfig, WgTunnelListener},
Tunnel, TunnelListener,
Tunnel, TunnelError, TunnelListener,
},
};
@@ -92,7 +93,25 @@ impl WireGuardImpl {
info.remote_addr.clone(),
));
while let Some(Ok(msg)) = stream.next().await {
let mut map_key = None;
loop {
let msg = match timeout(Duration::from_secs(120), stream.next()).await {
Ok(Some(Ok(msg))) => msg,
Ok(Some(Err(err))) => {
tracing::error!(?err, "Failed to receive from wg client");
break;
}
Ok(None) => {
tracing::info!("Wireguard client disconnected");
break;
}
Err(err) => {
tracing::error!(?err, "Timeout while receiving from wg client");
break;
}
};
assert_eq!(msg.packet_type(), ZCPacketType::WG);
let inner = msg.inner();
let Some(i) = Ipv4Packet::new(&inner) else {
@@ -104,6 +123,7 @@ impl WireGuardImpl {
endpoint_addr: remote_addr.parse().ok(),
sink: mpsc_tunnel.get_sink(),
});
map_key = Some(i.get_source());
wg_peer_ip_table.insert(i.get_source(), client_entry.clone());
ip_registered = true;
}
@@ -114,6 +134,11 @@ impl WireGuardImpl {
.await;
}
if map_key.is_some() {
tracing::info!(?map_key, "Removing wg client from table");
wg_peer_ip_table.remove(&map_key.unwrap());
}
peer_mgr
.get_global_ctx()
.issue_event(GlobalCtxEvent::VpnPortalClientDisconnected(
@@ -157,9 +182,15 @@ impl WireGuardImpl {
ZCPacketType::WG,
);
if let Err(ret) = entry.sink.send(packet).await {
tracing::debug!(?ret, "Failed to send packet to wg client");
match entry.sink.try_send(packet) {
Ok(_) => {
tracing::trace!("Sent packet to wg client");
}
Err(e) => {
tracing::debug!(?e, "Failed to send packet to wg client");
}
}
None
}
}