zero copy tunnel (#55)

make tunnel zero copy, for better performance. remove most of the locks in io path.
introduce quic tunnel
prepare for encryption
This commit is contained in:
Sijie.Sun
2024-04-24 23:12:46 +08:00
committed by GitHub
parent 39021d7b1b
commit 3467890270
44 changed files with 6504 additions and 688 deletions
+12 -12
View File
@@ -4,23 +4,23 @@ use std::{
};
use dashmap::DashMap;
use tokio::{
sync::{mpsc, Mutex},
task::JoinSet,
};
use tokio_util::bytes::Bytes;
use tokio::{sync::Mutex, task::JoinSet};
use crate::common::{
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
PeerId,
use crate::{
common::{
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
PeerId,
},
tunnel::packet_def::ZCPacket,
};
use super::{
foreign_network_manager::{ForeignNetworkServiceClient, FOREIGN_NETWORK_SERVICE_ID},
peer_conn::PeerConn,
peer_map::PeerMap,
peer_rpc::PeerRpcManager,
zc_peer_conn::PeerConn,
PacketRecvChan,
};
pub struct ForeignNetworkClient {
@@ -37,7 +37,7 @@ pub struct ForeignNetworkClient {
impl ForeignNetworkClient {
pub fn new(
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: mpsc::Sender<Bytes>,
packet_sender_to_mgr: PacketRecvChan,
peer_rpc: Arc<PeerRpcManager>,
my_peer_id: PeerId,
) -> Self {
@@ -148,7 +148,7 @@ impl ForeignNetworkClient {
self.next_hop.get(&peer_id).map(|v| v.clone())
}
pub async fn send_msg(&self, msg: Bytes, peer_id: PeerId) -> Result<(), Error> {
pub async fn send_msg(&self, msg: ZCPacket, peer_id: PeerId) -> Result<(), Error> {
if let Some(next_hop) = self.get_next_hop(peer_id) {
let ret = self.peer_map.send_msg_directly(msg, next_hop).await;
if ret.is_err() {
+67 -22
View File
@@ -15,19 +15,21 @@ use tokio::{
},
task::JoinSet,
};
use tokio_util::bytes::Bytes;
use crate::common::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
PeerId,
use crate::{
common::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
PeerId,
},
tunnel::packet_def::{PacketType, ZCPacket},
};
use super::{
packet::{self},
peer_conn::PeerConn,
peer_map::PeerMap,
peer_rpc::{PeerRpcManager, PeerRpcManagerTransport},
zc_peer_conn::PeerConn,
PacketRecvChan, PacketRecvChanReceiver,
};
struct ForeignNetworkEntry {
@@ -38,7 +40,7 @@ struct ForeignNetworkEntry {
impl ForeignNetworkEntry {
fn new(
network: NetworkIdentity,
packet_sender: mpsc::Sender<Bytes>,
packet_sender: PacketRecvChan,
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
) -> Self {
@@ -53,7 +55,7 @@ struct ForeignNetworkManagerData {
}
impl ForeignNetworkManagerData {
async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
let network_name = self
.peer_network_map
.get(&dst_peer_id)
@@ -94,7 +96,7 @@ struct RpcTransport {
my_peer_id: PeerId,
data: Arc<ForeignNetworkManagerData>,
packet_recv: Mutex<UnboundedReceiver<Bytes>>,
packet_recv: Mutex<UnboundedReceiver<ZCPacket>>,
}
#[async_trait::async_trait]
@@ -103,11 +105,11 @@ impl PeerRpcManagerTransport for RpcTransport {
self.my_peer_id
}
async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
self.data.send_msg(msg, dst_peer_id).await
}
async fn recv(&self) -> Result<Bytes, Error> {
async fn recv(&self) -> Result<ZCPacket, Error> {
if let Some(o) = self.packet_recv.lock().await.recv().await {
Ok(o)
} else {
@@ -138,14 +140,14 @@ impl ForeignNetworkService for Arc<ForeignNetworkManagerData> {
pub struct ForeignNetworkManager {
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: mpsc::Sender<Bytes>,
packet_sender_to_mgr: PacketRecvChan,
packet_sender: mpsc::Sender<Bytes>,
packet_recv: Mutex<Option<mpsc::Receiver<Bytes>>>,
packet_sender: PacketRecvChan,
packet_recv: Mutex<Option<PacketRecvChanReceiver>>,
data: Arc<ForeignNetworkManagerData>,
rpc_mgr: Arc<PeerRpcManager>,
rpc_transport_sender: UnboundedSender<Bytes>,
rpc_transport_sender: UnboundedSender<ZCPacket>,
tasks: Mutex<JoinSet<()>>,
}
@@ -154,7 +156,7 @@ impl ForeignNetworkManager {
pub fn new(
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: mpsc::Sender<Bytes>,
packet_sender_to_mgr: PacketRecvChan,
) -> Self {
// recv packet from all foreign networks
let (packet_sender, packet_recv) = mpsc::channel(1000);
@@ -242,12 +244,15 @@ impl ForeignNetworkManager {
self.tasks.lock().await.spawn(async move {
while let Some(packet_bytes) = recv.recv().await {
let packet = packet::Packet::decode(&packet_bytes);
let from_peer_id = packet.from_peer.into();
let to_peer_id = packet.to_peer.into();
let Some(hdr) = packet_bytes.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
};
let from_peer_id = hdr.from_peer_id.get();
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id == my_node_id {
if packet.packet_type == packet::PacketType::TaRpc {
rpc_sender.send(packet_bytes.clone()).unwrap();
if hdr.packet_type == PacketType::TaRpc as u8 {
rpc_sender.send(packet_bytes).unwrap();
continue;
}
if let Err(e) = sender_to_mgr.send(packet_bytes).await {
@@ -343,6 +348,27 @@ mod tests {
peer_mgr
}
#[tokio::test]
async fn foreign_network_basic() {
let pm_center = create_mock_peer_manager_with_mock_stun(crate::rpc::NatType::Unknown).await;
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
tracing::debug!(
"pma_net1: {:?}, pmb_net1: {:?}",
pma_net1.my_peer_id(),
pmb_net1.my_peer_id()
);
connect_peer_manager(pma_net1.clone(), pm_center.clone()).await;
connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await;
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
.await
.unwrap();
assert_eq!(1, pma_net1.list_routes().await.len());
assert_eq!(1, pmb_net1.list_routes().await.len());
}
#[tokio::test]
async fn test_foreign_network_manager() {
let pm_center = create_mock_peer_manager_with_mock_stun(crate::rpc::NatType::Unknown).await;
@@ -350,11 +376,23 @@ mod tests {
create_mock_peer_manager_with_mock_stun(crate::rpc::NatType::Unknown).await;
connect_peer_manager(pm_center.clone(), pm_center2.clone()).await;
tracing::debug!(
"pm_center: {:?}, pm_center2: {:?}",
pm_center.my_peer_id(),
pm_center2.my_peer_id()
);
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pma_net1.clone(), pm_center.clone()).await;
connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await;
tracing::debug!(
"pma_net1: {:?}, pmb_net1: {:?}",
pma_net1.my_peer_id(),
pmb_net1.my_peer_id()
);
let now = std::time::Instant::now();
let mut succ = false;
while now.elapsed().as_secs() < 10 {
@@ -399,8 +437,15 @@ mod tests {
.unwrap();
assert_eq!(2, pmc_net1.list_routes().await.len());
tracing::debug!("pmc_net1: {:?}", pmc_net1.my_peer_id());
let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
tracing::debug!(
"pma_net2: {:?}, pmb_net2: {:?}",
pma_net2.my_peer_id(),
pmb_net2.my_peer_id()
);
connect_peer_manager(pma_net2.clone(), pm_center.clone()).await;
connect_peer_manager(pmb_net2.clone(), pm_center.clone()).await;
wait_route_appear(pma_net2.clone(), pmb_net2.clone())
+10 -9
View File
@@ -1,6 +1,7 @@
pub mod packet;
pub mod peer;
pub mod peer_conn;
// pub mod peer_conn;
pub mod peer_conn_ping;
pub mod peer_manager;
pub mod peer_map;
pub mod peer_ospf_route;
@@ -8,6 +9,7 @@ pub mod peer_rip_route;
pub mod peer_rpc;
pub mod route_trait;
pub mod rpc_service;
pub mod zc_peer_conn;
pub mod foreign_network_client;
pub mod foreign_network_manager;
@@ -15,25 +17,24 @@ pub mod foreign_network_manager;
#[cfg(test)]
pub mod tests;
use tokio_util::bytes::{Bytes, BytesMut};
use crate::tunnel::packet_def::ZCPacket;
#[async_trait::async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PeerPacketFilter {
async fn try_process_packet_from_peer(
&self,
_packet: &packet::ArchivedPacket,
_data: &Bytes,
) -> Option<()> {
None
async fn try_process_packet_from_peer(&self, _zc_packet: ZCPacket) -> Option<ZCPacket> {
Some(_zc_packet)
}
}
#[async_trait::async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait NicPacketFilter {
async fn try_process_packet_from_nic(&self, data: BytesMut) -> BytesMut;
async fn try_process_packet_from_nic(&self, data: &mut ZCPacket);
}
type BoxPeerPacketFilter = Box<dyn PeerPacketFilter + Send + Sync>;
type BoxNicPacketFilter = Box<dyn NicPacketFilter + Send + Sync>;
pub type PacketRecvChan = tokio::sync::mpsc::Sender<ZCPacket>;
pub type PacketRecvChanReceiver = tokio::sync::mpsc::Receiver<ZCPacket>;
+17 -11
View File
@@ -7,16 +7,22 @@ use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
};
use tokio_util::bytes::Bytes;
use tracing::Instrument;
use super::peer_conn::{PeerConn, PeerConnId};
use crate::common::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
PeerId,
use super::{
zc_peer_conn::{PeerConn, PeerConnId},
PacketRecvChan,
};
use crate::rpc::PeerConnInfo;
use crate::{
common::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
PeerId,
},
tunnel::packet_def::ZCPacket,
};
type ArcPeerConn = Arc<Mutex<PeerConn>>;
type ConnMap = Arc<DashMap<PeerConnId, ArcPeerConn>>;
@@ -26,7 +32,7 @@ pub struct Peer {
conns: ConnMap,
global_ctx: ArcGlobalCtx,
packet_recv_chan: mpsc::Sender<Bytes>,
packet_recv_chan: PacketRecvChan,
close_event_sender: mpsc::Sender<PeerConnId>,
close_event_listener: JoinHandle<()>,
@@ -37,7 +43,7 @@ pub struct Peer {
impl Peer {
pub fn new(
peer_node_id: PeerId,
packet_recv_chan: mpsc::Sender<Bytes>,
packet_recv_chan: PacketRecvChan,
global_ctx: ArcGlobalCtx,
) -> Self {
let conns: ConnMap = Arc::new(DashMap::new());
@@ -106,7 +112,7 @@ impl Peer {
.insert(conn.get_conn_id(), Arc::new(Mutex::new(conn)));
}
pub async fn send_msg(&self, msg: Bytes) -> Result<(), Error> {
pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> {
let Some(conn) = self.conns.iter().next() else {
return Err(Error::PeerNoConnectionError(self.peer_node_id));
};
@@ -157,8 +163,8 @@ mod tests {
use crate::{
common::{global_ctx::tests::get_mock_global_ctx, new_peer_id},
peers::peer_conn::PeerConn,
tunnels::ring_tunnel::create_ring_tunnel_pair,
peers::zc_peer_conn::PeerConn,
tunnel::ring::create_ring_tunnel_pair,
};
use super::Peer;
+219
View File
@@ -0,0 +1,219 @@
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};
use tokio::{sync::broadcast, task::JoinSet, time::timeout};
use crate::{
common::{error::Error, PeerId},
tunnel::{
mpsc::MpscTunnelSender,
packet_def::{PacketType, ZCPacket},
stats::WindowLatency,
TunnelError,
},
};
pub struct PeerConnPinger {
my_peer_id: PeerId,
peer_id: PeerId,
sink: MpscTunnelSender,
ctrl_sender: broadcast::Sender<ZCPacket>,
latency_stats: Arc<WindowLatency>,
loss_rate_stats: Arc<AtomicU32>,
tasks: JoinSet<Result<(), TunnelError>>,
}
impl std::fmt::Debug for PeerConnPinger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerConnPinger")
.field("my_peer_id", &self.my_peer_id)
.field("peer_id", &self.peer_id)
.finish()
}
}
impl PeerConnPinger {
pub fn new(
my_peer_id: PeerId,
peer_id: PeerId,
sink: MpscTunnelSender,
ctrl_sender: broadcast::Sender<ZCPacket>,
latency_stats: Arc<WindowLatency>,
loss_rate_stats: Arc<AtomicU32>,
) -> Self {
Self {
my_peer_id,
peer_id,
sink,
tasks: JoinSet::new(),
latency_stats,
ctrl_sender,
loss_rate_stats,
}
}
fn new_ping_packet(my_node_id: PeerId, peer_id: PeerId, seq: u32) -> ZCPacket {
let mut packet = ZCPacket::new_with_payload(&seq.to_le_bytes());
packet.fill_peer_manager_hdr(my_node_id, peer_id, PacketType::Ping as u8);
packet
}
async fn do_pingpong_once(
my_node_id: PeerId,
peer_id: PeerId,
sink: &mut MpscTunnelSender,
receiver: &mut broadcast::Receiver<ZCPacket>,
seq: u32,
) -> Result<u128, Error> {
// should add seq here. so latency can be calculated more accurately
let req = Self::new_ping_packet(my_node_id, peer_id, seq);
sink.send(req).await?;
let now = std::time::Instant::now();
// wait until we get a pong packet in ctrl_resp_receiver
let resp = timeout(Duration::from_secs(1), async {
loop {
match receiver.recv().await {
Ok(p) => {
let payload = p.payload();
let Ok(seq_buf) = payload[0..4].try_into() else {
tracing::debug!("pingpong recv invalid packet, continue");
continue;
};
let resp_seq = u32::from_le_bytes(seq_buf);
if resp_seq == seq {
break;
}
}
Err(e) => {
return Err(Error::WaitRespError(format!(
"wait ping response error: {:?}",
e
)));
}
}
}
Ok(())
})
.await;
tracing::trace!(?resp, "wait ping response done");
if resp.is_err() {
return Err(Error::WaitRespError(
"wait ping response timeout".to_owned(),
));
}
if resp.as_ref().unwrap().is_err() {
return Err(resp.unwrap().err().unwrap());
}
Ok(now.elapsed().as_micros())
}
pub async fn pingpong(&mut self) {
let sink = self.sink.clone();
let my_node_id = self.my_peer_id;
let peer_id = self.peer_id;
let latency_stats = self.latency_stats.clone();
let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100);
let stopped = Arc::new(AtomicU32::new(0));
// generate a pingpong task every 200ms
let mut pingpong_tasks = JoinSet::new();
let ctrl_resp_sender = self.ctrl_sender.clone();
let stopped_clone = stopped.clone();
self.tasks.spawn(async move {
let mut req_seq = 0;
loop {
let receiver = ctrl_resp_sender.subscribe();
let ping_res_sender = ping_res_sender.clone();
if stopped_clone.load(Ordering::Relaxed) != 0 {
return Ok(());
}
while pingpong_tasks.len() > 5 {
pingpong_tasks.join_next().await;
}
let mut sink = sink.clone();
pingpong_tasks.spawn(async move {
let mut receiver = receiver.resubscribe();
let pingpong_once_ret = Self::do_pingpong_once(
my_node_id,
peer_id,
&mut sink,
&mut receiver,
req_seq,
)
.await;
if let Err(e) = ping_res_sender.send(pingpong_once_ret).await {
tracing::info!(?e, "pingpong task send result error, exit..");
};
});
req_seq = req_seq.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(1000)).await;
}
});
// one with 1% precision
let loss_rate_stats_1 = WindowLatency::new(100);
// one with 20% precision, so we can fast fail this conn.
let loss_rate_stats_20 = WindowLatency::new(5);
let mut counter: u64 = 0;
while let Some(ret) = ping_res_receiver.recv().await {
counter += 1;
if let Ok(lat) = ret {
latency_stats.record_latency(lat as u32);
loss_rate_stats_1.record_latency(0);
loss_rate_stats_20.record_latency(0);
} else {
loss_rate_stats_1.record_latency(1);
loss_rate_stats_20.record_latency(1);
}
let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us();
let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us();
tracing::trace!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
"pingpong task recv pingpong_once result"
);
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
tracing::warn!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
"pingpong loss rate too high, closing"
);
break;
}
self.loss_rate_stats
.store((loss_rate_1 * 100.0) as u32, Ordering::Relaxed);
}
stopped.store(1, Ordering::Relaxed);
ping_res_receiver.close();
}
}
+101 -80
View File
@@ -5,6 +5,7 @@ use std::{
};
use async_trait::async_trait;
use futures::StreamExt;
use tokio::{
@@ -15,30 +16,30 @@ use tokio::{
task::JoinSet,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::bytes::Bytes;
use crate::{
common::{
error::Error, global_ctx::ArcGlobalCtx, rkyv_util::extract_bytes_from_archived_string,
PeerId,
},
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
peers::{
packet, peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport,
route_trait::RouteInterface, PeerPacketFilter,
packet, peer_rpc::PeerRpcManagerTransport, route_trait::RouteInterface,
zc_peer_conn::PeerConn, PeerPacketFilter,
},
tunnel::{
packet_def::{PacketType, ZCPacket},
SinkItem, Tunnel, TunnelConnector,
},
tunnels::{SinkItem, Tunnel, TunnelConnector},
};
use super::{
foreign_network_client::ForeignNetworkClient,
foreign_network_manager::ForeignNetworkManager,
peer_conn::PeerConnId,
peer_map::PeerMap,
peer_ospf_route::PeerRoute,
peer_rip_route::BasicRoute,
peer_rpc::PeerRpcManager,
route_trait::{ArcRoute, Route},
BoxNicPacketFilter, BoxPeerPacketFilter,
zc_peer_conn::PeerConnId,
BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChanReceiver,
};
struct RpcTransport {
@@ -46,8 +47,8 @@ struct RpcTransport {
peers: Weak<PeerMap>,
foreign_peers: Mutex<Option<Weak<ForeignNetworkClient>>>,
packet_recv: Mutex<UnboundedReceiver<Bytes>>,
peer_rpc_tspt_sender: UnboundedSender<Bytes>,
packet_recv: Mutex<UnboundedReceiver<ZCPacket>>,
peer_rpc_tspt_sender: UnboundedSender<ZCPacket>,
}
#[async_trait::async_trait]
@@ -56,7 +57,7 @@ impl PeerRpcManagerTransport for RpcTransport {
self.my_peer_id
}
async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
let foreign_peers = self
.foreign_peers
.lock()
@@ -67,21 +68,30 @@ impl PeerRpcManagerTransport for RpcTransport {
.ok_or(Error::Unknown)?;
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
let ret = peers.send_msg(msg.clone(), dst_peer_id).await;
if matches!(ret, Err(Error::RouteError(..))) && foreign_peers.has_next_hop(dst_peer_id) {
tracing::info!(
if let Some(gateway_id) = peers.get_gateway_peer_id(dst_peer_id).await {
tracing::trace!(
?dst_peer_id,
?gateway_id,
?self.my_peer_id,
"send msg to peer via gateway",
);
peers.send_msg_directly(msg, gateway_id).await
} else if foreign_peers.has_next_hop(dst_peer_id) {
tracing::debug!(
?dst_peer_id,
?self.my_peer_id,
"failed to send msg to peer, try foreign network",
);
return foreign_peers.send_msg(msg, dst_peer_id).await;
foreign_peers.send_msg(msg, dst_peer_id).await
} else {
Err(Error::RouteError(Some(format!(
"peermgr RpcTransport no route for dst_peer_id: {}",
dst_peer_id
))))
}
ret
}
async fn recv(&self) -> Result<Bytes, Error> {
async fn recv(&self) -> Result<ZCPacket, Error> {
if let Some(o) = self.packet_recv.lock().await.recv().await {
Ok(o)
} else {
@@ -110,7 +120,7 @@ pub struct PeerManager {
tasks: Arc<Mutex<JoinSet<()>>>,
packet_recv: Arc<Mutex<Option<mpsc::Receiver<Bytes>>>>,
packet_recv: Arc<Mutex<Option<PacketRecvChanReceiver>>>,
peers: Arc<PeerMap>,
@@ -261,17 +271,20 @@ impl PeerManager {
self.tasks.lock().await.spawn(async move {
log::trace!("start_peer_recv");
while let Some(ret) = recv.next().await {
log::trace!("peer recv a packet...: {:?}", ret);
let packet = packet::Packet::decode(&ret);
let from_peer_id: PeerId = packet.from_peer.into();
let to_peer_id: PeerId = packet.to_peer.into();
let Some(hdr) = ret.peer_manager_header() else {
tracing::warn!(?ret, "invalid packet, skip");
continue;
};
tracing::trace!(?hdr, ?ret, "peer recv a packet...");
let from_peer_id = hdr.from_peer_id.get();
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id != my_peer_id {
log::trace!(
"need forward: to_peer_id: {:?}, my_peer_id: {:?}",
to_peer_id,
my_peer_id
);
let ret = peers.send_msg(ret.clone(), to_peer_id).await;
let ret = peers.send_msg(ret, to_peer_id).await;
if ret.is_err() {
log::error!(
"forward packet error: {:?}, dst: {:?}, from: {:?}",
@@ -282,15 +295,21 @@ impl PeerManager {
}
} else {
let mut processed = false;
let mut zc_packet = Some(ret);
let mut idx = 0;
for pipeline in pipe_line.read().await.iter().rev() {
if let Some(_) = pipeline.try_process_packet_from_peer(&packet, &ret).await
{
tracing::debug!(?zc_packet, ?idx, "try_process_packet_from_peer");
idx += 1;
zc_packet = pipeline
.try_process_packet_from_peer(zc_packet.unwrap())
.await;
if zc_packet.is_none() {
processed = true;
break;
}
}
if !processed {
tracing::error!("unexpected packet: {:?}", ret);
tracing::error!(?zc_packet, "unhandled packet");
}
}
}
@@ -321,20 +340,15 @@ impl PeerManager {
}
#[async_trait::async_trait]
impl PeerPacketFilter for NicPacketProcessor {
async fn try_process_packet_from_peer(
&self,
packet: &packet::ArchivedPacket,
data: &Bytes,
) -> Option<()> {
if packet.packet_type == packet::PacketType::Data {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
let hdr = packet.peer_manager_header().unwrap();
if hdr.packet_type == PacketType::Data as u8 {
tracing::trace!(?packet, "send packet to nic channel");
// TODO: use a function to get the body ref directly for zero copy
self.nic_channel
.send(extract_bytes_from_archived_string(data, &packet.payload))
.await
.unwrap();
Some(())
} else {
self.nic_channel.send(packet).await.unwrap();
None
} else {
Some(packet)
}
}
}
@@ -345,21 +359,18 @@ impl PeerManager {
// for peer rpc packet
struct PeerRpcPacketProcessor {
peer_rpc_tspt_sender: UnboundedSender<Bytes>,
peer_rpc_tspt_sender: UnboundedSender<ZCPacket>,
}
#[async_trait::async_trait]
impl PeerPacketFilter for PeerRpcPacketProcessor {
async fn try_process_packet_from_peer(
&self,
packet: &packet::ArchivedPacket,
data: &Bytes,
) -> Option<()> {
if packet.packet_type == packet::PacketType::TaRpc {
self.peer_rpc_tspt_sender.send(data.clone()).unwrap();
Some(())
} else {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
let hdr = packet.peer_manager_header().unwrap();
if hdr.packet_type == PacketType::TaRpc as u8 {
self.peer_rpc_tspt_sender.send(packet).unwrap();
None
} else {
Some(packet)
}
}
}
@@ -401,7 +412,7 @@ impl PeerManager {
async fn send_route_packet(
&self,
msg: Bytes,
route_id: u8,
_route_id: u8,
dst_peer_id: PeerId,
) -> Result<(), Error> {
let foreign_client = self
@@ -409,15 +420,17 @@ impl PeerManager {
.upgrade()
.ok_or(Error::Unknown)?;
let peer_map = self.peers.upgrade().ok_or(Error::Unknown)?;
let packet_bytes: Bytes =
packet::Packet::new_route_packet(self.my_peer_id, dst_peer_id, route_id, &msg)
.into();
let mut zc_packet = ZCPacket::new_with_payload(&msg);
zc_packet.fill_peer_manager_hdr(
self.my_peer_id,
dst_peer_id,
PacketType::Route as u8,
);
if foreign_client.has_next_hop(dst_peer_id) {
return foreign_client.send_msg(packet_bytes, dst_peer_id).await;
foreign_client.send_msg(zc_packet, dst_peer_id).await
} else {
peer_map.send_msg_directly(zc_packet, dst_peer_id).await
}
peer_map.send_msg_directly(packet_bytes, dst_peer_id).await
}
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
@@ -450,18 +463,17 @@ impl PeerManager {
self.get_route().list_routes().await
}
async fn run_nic_packet_process_pipeline(&self, mut data: BytesMut) -> BytesMut {
async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) {
for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() {
data = pipeline.try_process_packet_from_nic(data).await;
pipeline.try_process_packet_from_nic(data).await;
}
data
}
pub async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
pub async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
self.peers.send_msg(msg, dst_peer_id).await
}
pub async fn send_msg_ipv4(&self, msg: BytesMut, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
pub async fn send_msg_ipv4(&self, mut msg: ZCPacket, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
log::trace!(
"do send_msg in peer manager, msg: {:?}, ipv4_addr: {}",
msg,
@@ -487,25 +499,34 @@ impl PeerManager {
return Ok(());
}
let msg = self.run_nic_packet_process_pipeline(msg).await;
self.run_nic_packet_process_pipeline(&mut msg).await;
let mut errs: Vec<Error> = vec![];
for peer_id in dst_peers.iter() {
let msg: Bytes =
packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into();
let send_ret = self.peers.send_msg(msg.clone(), *peer_id).await;
let mut msg = Some(msg);
let total_dst_peers = dst_peers.len();
for i in 0..total_dst_peers {
let mut msg = if i == total_dst_peers - 1 {
msg.take().unwrap()
} else {
msg.clone().unwrap()
};
if matches!(send_ret, Err(Error::RouteError(..)))
&& self.foreign_network_client.has_next_hop(*peer_id)
{
let foreign_send_ret = self.foreign_network_client.send_msg(msg, *peer_id).await;
if foreign_send_ret.is_ok() {
continue;
let peer_id = &dst_peers[i];
msg.fill_peer_manager_hdr(self.my_peer_id, *peer_id, packet::PacketType::Data as u8);
if let Some(gateway) = self.peers.get_gateway_peer_id(*peer_id).await {
if let Err(e) = self.peers.send_msg_directly(msg.clone(), gateway).await {
errs.push(e);
}
} else if self.foreign_network_client.has_next_hop(*peer_id) {
if let Err(e) = self
.foreign_network_client
.send_msg(msg.clone(), *peer_id)
.await
{
errs.push(e);
}
}
if let Err(send_ret) = send_ret {
errs.push(send_ret);
}
}
+31 -29
View File
@@ -2,8 +2,7 @@ use std::{net::Ipv4Addr, sync::Arc};
use anyhow::Context;
use dashmap::DashMap;
use tokio::sync::{mpsc, RwLock};
use tokio_util::bytes::Bytes;
use tokio::sync::RwLock;
use crate::{
common::{
@@ -12,29 +11,27 @@ use crate::{
PeerId,
},
rpc::PeerConnInfo,
tunnel::packet_def::ZCPacket,
tunnels::TunnelError,
};
use super::{
peer::Peer,
peer_conn::{PeerConn, PeerConnId},
route_trait::ArcRoute,
zc_peer_conn::{PeerConn, PeerConnId},
PacketRecvChan,
};
pub struct PeerMap {
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
peer_map: DashMap<PeerId, Arc<Peer>>,
packet_send: mpsc::Sender<Bytes>,
packet_send: PacketRecvChan,
routes: RwLock<Vec<ArcRoute>>,
}
impl PeerMap {
pub fn new(
packet_send: mpsc::Sender<Bytes>,
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
) -> Self {
pub fn new(packet_send: PacketRecvChan, global_ctx: ArcGlobalCtx, my_peer_id: PeerId) -> Self {
PeerMap {
global_ctx,
my_peer_id,
@@ -72,7 +69,7 @@ impl PeerMap {
self.peer_map.contains_key(&peer_id)
}
pub async fn send_msg_directly(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
pub async fn send_msg_directly(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
if dst_peer_id == self.my_peer_id {
return Ok(self
.packet_send
@@ -87,48 +84,53 @@ impl PeerMap {
}
None => {
log::error!("no peer for dst_peer_id: {}", dst_peer_id);
return Err(Error::RouteError(None));
return Err(Error::RouteError(Some(format!(
"peer map sengmsg directly no connected dst_peer_id: {}",
dst_peer_id
))));
}
}
Ok(())
}
pub async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> {
pub async fn get_gateway_peer_id(&self, dst_peer_id: PeerId) -> Option<PeerId> {
if dst_peer_id == self.my_peer_id {
return Ok(self
.packet_send
.send(msg)
.await
.with_context(|| "send msg to self failed")?);
return Some(dst_peer_id);
}
if self.has_peer(dst_peer_id) {
return Some(dst_peer_id);
}
// get route info
let mut gateway_peer_id = None;
for route in self.routes.read().await.iter() {
gateway_peer_id = route.get_next_hop(dst_peer_id).await;
if gateway_peer_id.is_none() {
continue;
} else {
break;
if let Some(gateway_peer_id) = route.get_next_hop(dst_peer_id).await {
// for foreign network, gateway_peer_id may not connect to me
if self.has_peer(gateway_peer_id) {
return Some(gateway_peer_id);
}
}
}
if gateway_peer_id.is_none() && self.has_peer(dst_peer_id) {
gateway_peer_id = Some(dst_peer_id);
}
None
}
let Some(gateway_peer_id) = gateway_peer_id else {
pub async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
let Some(gateway_peer_id) = self.get_gateway_peer_id(dst_peer_id).await else {
tracing::trace!(
"no gateway for dst_peer_id: {}, peers: {:?}, my_peer_id: {}",
dst_peer_id,
self.peer_map.iter().map(|v| *v.key()).collect::<Vec<_>>(),
self.my_peer_id
);
return Err(Error::RouteError(None));
return Err(Error::RouteError(Some(format!(
"peer map sengmsg no gateway for dst_peer_id: {}",
dst_peer_id
))));
};
self.send_msg_directly(msg.clone(), gateway_peer_id).await?;
self.send_msg_directly(msg, gateway_peer_id).await?;
return Ok(());
}
+17 -16
View File
@@ -203,6 +203,7 @@ struct SyncRouteInfoResponse {
trait RouteService {
async fn sync_route_info(
my_peer_id: PeerId,
my_session_id: SessionId,
is_initiator: bool,
peer_infos: Option<Vec<RoutePeerInfo>>,
conn_bitmap: Option<RouteConnBitmap>,
@@ -547,6 +548,15 @@ impl SyncRouteSession {
self.we_are_initiator.store(is_initiator, Ordering::Relaxed);
self.need_sync_initiator_info.store(true, Ordering::Relaxed);
}
fn update_dst_session_id(&self, session_id: SessionId) {
if session_id != self.dst_session_id.load(Ordering::Relaxed) {
tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info.");
self.dst_session_id.store(session_id, Ordering::Relaxed);
self.dst_saved_conn_bitmap_version.clear();
self.dst_saved_peer_info_versions.clear();
}
}
}
struct PeerRouteServiceImpl {
@@ -794,6 +804,7 @@ impl PeerRouteServiceImpl {
.sync_route_info(
rpc_ctx,
my_peer_id,
session.my_session_id.load(Ordering::Relaxed),
session.we_are_initiator.load(Ordering::Relaxed),
peer_infos.clone(),
conn_bitmap.clone(),
@@ -814,19 +825,7 @@ impl PeerRouteServiceImpl {
.need_sync_initiator_info
.store(false, Ordering::Relaxed);
if ret.session_id != session.dst_session_id.load(Ordering::Relaxed) {
tracing::warn!(
?ret,
?my_peer_id,
?dst_peer_id,
"session id mismatch, clear saved info."
);
session
.dst_session_id
.store(ret.session_id, Ordering::Relaxed);
session.dst_saved_conn_bitmap_version.clear();
session.dst_saved_peer_info_versions.clear();
}
session.update_dst_session_id(ret.session_id);
if let Some(peer_infos) = &peer_infos {
session.update_dst_saved_peer_info_version(&peer_infos);
@@ -864,6 +863,7 @@ impl RouteService for RouteSessionManager {
self,
_: tarpc::context::Context,
from_peer_id: PeerId,
from_session_id: SessionId,
is_initiator: bool,
peer_infos: Option<Vec<RoutePeerInfo>>,
conn_bitmap: Option<RouteConnBitmap>,
@@ -877,6 +877,8 @@ impl RouteService for RouteSessionManager {
session.rpc_rx_count.fetch_add(1, Ordering::Relaxed);
session.update_dst_session_id(from_session_id);
if let Some(peer_infos) = &peer_infos {
service_impl.synced_route_info.update_peer_infos(
my_peer_id,
@@ -1383,9 +1385,8 @@ mod tests {
let i_a = get_is_initiator(&r_a, p_b.my_peer_id());
let i_b = get_is_initiator(&r_b, p_a.my_peer_id());
assert_ne!(i_a.0, i_a.1);
assert_ne!(i_b.0, i_b.1);
assert_ne!(i_a.0, i_b.0);
assert_eq!(i_a.0, i_b.1);
assert_eq!(i_b.0, i_a.1);
drop(r_b);
drop(p_b);
+11 -24
View File
@@ -15,14 +15,12 @@ use tracing::Instrument;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait, PeerId},
peers::{
packet,
route_trait::{Route, RouteInterfaceBox},
},
peers::route_trait::{Route, RouteInterfaceBox},
rpc::{NatType, StunInfo},
tunnel::packet_def::{PacketType, ZCPacket},
};
use super::{packet::CtrlPacketPayload, PeerPacketFilter};
use super::PeerPacketFilter;
const SEND_ROUTE_PERIOD_SEC: u64 = 60;
const SEND_ROUTE_FAST_REPLY_SEC: u64 = 5;
@@ -625,26 +623,15 @@ impl Route for BasicRoute {
#[async_trait::async_trait]
impl PeerPacketFilter for BasicRoute {
async fn try_process_packet_from_peer(
&self,
packet: &packet::ArchivedPacket,
_data: &Bytes,
) -> Option<()> {
if packet.packet_type == packet::PacketType::RoutePacket {
let CtrlPacketPayload::RoutePacket(route_packet) =
CtrlPacketPayload::from_packet(packet)
else {
return None;
};
self.handle_route_packet(
packet.from_peer.into(),
route_packet.body.into_boxed_slice().into(),
)
.await;
Some(())
} else {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
let hdr = packet.peer_manager_header().unwrap();
if hdr.packet_type == PacketType::Route as u8 {
let b = packet.payload().to_vec();
self.handle_route_packet(hdr.from_peer_id.get(), b.into())
.await;
None
} else {
Some(packet)
}
}
}
+75 -56
View File
@@ -2,22 +2,22 @@ use std::sync::{atomic::AtomicU32, Arc};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use rkyv::Deserialize;
use prost::Message;
use tarpc::{server::Channel, transport::channel::UnboundedChannel};
use tokio::{
sync::mpsc::{self, UnboundedSender},
task::JoinSet,
};
use tokio_util::bytes::Bytes;
use tracing::Instrument;
use crate::{
common::{error::Error, PeerId},
peers::packet::Packet,
rpc::TaRpcPacket,
tunnel::packet_def::{PacketType, ZCPacket},
};
use super::packet::CtrlPacketPayload;
type PeerRpcServiceId = u32;
type PeerRpcTransactId = u32;
@@ -25,11 +25,11 @@ type PeerRpcTransactId = u32;
#[auto_impl::auto_impl(Arc)]
pub trait PeerRpcManagerTransport: Send + Sync + 'static {
fn my_peer_id(&self) -> PeerId;
async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error>;
async fn recv(&self) -> Result<Bytes, Error>;
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error>;
async fn recv(&self) -> Result<ZCPacket, Error>;
}
type PacketSender = UnboundedSender<Packet>;
type PacketSender = UnboundedSender<ZCPacket>;
struct PeerRpcEndPoint {
peer_id: PeerId,
@@ -63,16 +63,6 @@ impl std::fmt::Debug for PeerRpcManager {
}
}
#[derive(Debug)]
struct TaRpcPacketInfo {
from_peer: PeerId,
to_peer: PeerId,
service_id: PeerRpcServiceId,
transact_id: PeerRpcTransactId,
is_req: bool,
content: Vec<u8>,
}
impl PeerRpcManager {
pub fn new(tspt: impl PeerRpcManagerTransport) -> Self {
Self {
@@ -100,7 +90,7 @@ impl PeerRpcManager {
let tspt = self.tspt.clone();
let creator = Box::new(move |peer_id: PeerId| {
let mut tasks = JoinSet::new();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel::<Packet>();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded();
let server = tarpc::server::BaseChannel::with_defaults(server_transport);
@@ -122,7 +112,7 @@ impl PeerRpcManager {
continue;
};
tracing::trace!(resp = ?resp, "recv packet from client");
tracing::debug!(resp = ?resp, "server recv packet from service provider");
if resp.is_err() {
tracing::warn!(err = ?resp.err(),
"[PEER RPC MGR] client_transport in server side got channel error, ignore it.");
@@ -136,7 +126,7 @@ impl PeerRpcManager {
continue;
}
let msg = Packet::new_tarpc_packet(
let msg = Self::build_rpc_packet(
tspt.my_peer_id(),
cur_req_peer_id,
service_id,
@@ -145,12 +135,13 @@ impl PeerRpcManager {
serialized_resp.unwrap(),
);
if let Err(e) = tspt.send(msg.into(), peer_id).await {
if let Err(e) = tspt.send(msg, peer_id).await {
tracing::error!(error = ?e, peer_id = ?peer_id, service_id = ?service_id, "send resp to peer failed");
}
}
Some(packet) = packet_receiver.recv() => {
let info = Self::parse_rpc_packet(&packet);
tracing::debug!(?info, "server recv packet from peer");
if let Err(e) = info {
tracing::error!(error = ?e, packet = ?packet, "parse rpc packet failed");
continue;
@@ -168,7 +159,7 @@ impl PeerRpcManager {
}
assert_eq!(info.service_id, service_id);
cur_req_peer_id = Some(packet.from_peer.clone().into());
cur_req_peer_id = Some(info.from_peer);
cur_transact_id = info.transact_id;
tracing::trace!("recv packet from peer, packet: {:?}", packet);
@@ -219,19 +210,33 @@ impl PeerRpcManager {
)
}
fn parse_rpc_packet(packet: &Packet) -> Result<TaRpcPacketInfo, Error> {
let ctrl_packet_payload = CtrlPacketPayload::from_packet2(&packet);
match &ctrl_packet_payload {
CtrlPacketPayload::TaRpc(id, tid, is_req, body) => Ok(TaRpcPacketInfo {
from_peer: packet.from_peer.into(),
to_peer: packet.to_peer.into(),
service_id: *id,
transact_id: *tid,
is_req: *is_req,
content: body.clone(),
}),
_ => Err(Error::ShellCommandError("invalid packet".to_owned())),
}
fn parse_rpc_packet(packet: &ZCPacket) -> Result<TaRpcPacket, Error> {
let payload = packet.payload();
TaRpcPacket::decode(payload).map_err(|e| Error::MessageDecodeError(e.to_string()))
}
fn build_rpc_packet(
from_peer: PeerId,
to_peer: PeerId,
service_id: PeerRpcServiceId,
transact_id: PeerRpcTransactId,
is_req: bool,
content: Vec<u8>,
) -> ZCPacket {
let packet = TaRpcPacket {
from_peer,
to_peer,
service_id,
transact_id,
is_req,
content,
};
let mut buf = Vec::new();
packet.encode(&mut buf).unwrap();
let mut zc_packet = ZCPacket::new_with_payload(&buf);
zc_packet.fill_peer_manager_hdr(from_peer, to_peer, PacketType::TaRpc as u8);
zc_packet
}
pub fn run(&self) {
@@ -245,9 +250,9 @@ impl PeerRpcManager {
tracing::warn!("peer rpc transport read aborted, exiting");
break;
};
let packet = Packet::decode(&o);
let packet: Packet = packet.deserialize(&mut rkyv::Infallible).unwrap();
let info = Self::parse_rpc_packet(&packet).unwrap();
let info = Self::parse_rpc_packet(&o).unwrap();
tracing::debug!(?info, "recv rpc packet from peer");
if info.is_req {
if !service_registry.contains_key(&info.service_id) {
@@ -265,15 +270,15 @@ impl PeerRpcManager {
service_registry.get(&info.service_id).unwrap()(info.from_peer)
});
endpoint.packet_sender.send(packet).unwrap();
endpoint.packet_sender.send(o).unwrap();
} else {
if let Some(a) = client_resp_receivers.get(&PeerRpcClientCtxKey(
info.from_peer,
info.service_id,
info.transact_id,
)) {
log::trace!("recv resp: {:?}", packet);
if let Err(e) = a.send(packet) {
tracing::trace!("recv resp: {:?}", info);
if let Err(e) = a.send(o) {
tracing::error!(error = ?e, "send resp to client failed");
}
} else {
@@ -297,7 +302,8 @@ impl PeerRpcManager {
Fut: std::future::Future<Output = RpcRet>,
{
let mut tasks = JoinSet::new();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel::<Packet>();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
let (client_transport, server_transport) =
tarpc::transport::channel::unbounded::<CM, Req>();
@@ -321,7 +327,7 @@ impl PeerRpcManager {
continue;
}
let a = Packet::new_tarpc_packet(
let packet = Self::build_rpc_packet(
tspt.my_peer_id(),
dst_peer_id,
service_id,
@@ -330,7 +336,9 @@ impl PeerRpcManager {
a.unwrap(),
);
if let Err(e) = tspt.send(a.into(), dst_peer_id).await {
tracing::debug!(?packet, "client send rpc packet to peer");
if let Err(e) = tspt.send(packet, dst_peer_id).await {
tracing::error!(error = ?e, dst_peer_id = ?dst_peer_id, "send to peer failed");
}
}
@@ -342,11 +350,12 @@ impl PeerRpcManager {
while let Some(packet) = packet_receiver.recv().await {
tracing::trace!("tunnel recv: {:?}", packet);
let info = PeerRpcManager::parse_rpc_packet(&packet);
let info = Self::parse_rpc_packet(&packet);
if let Err(e) = info {
tracing::error!(error = ?e, "parse rpc packet failed");
continue;
}
tracing::debug!(?info, "client recv rpc packet from peer");
let decoded = postcard::from_bytes(&info.unwrap().content.as_slice());
if let Err(e) = decoded {
@@ -381,8 +390,10 @@ impl PeerRpcManager {
#[cfg(test)]
mod tests {
use std::{pin::Pin, sync::Arc};
use futures::{SinkExt, StreamExt};
use tokio_util::bytes::Bytes;
use tokio::sync::Mutex;
use crate::{
common::{error::Error, new_peer_id, PeerId},
@@ -390,7 +401,10 @@ mod tests {
peer_rpc::PeerRpcManager,
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
},
tunnels::{self, ring_tunnel::create_ring_tunnel_pair},
tunnel::{
packet_def::ZCPacket, ring::create_ring_tunnel_pair, Tunnel, ZCPacketSink,
ZCPacketStream,
},
};
use super::PeerRpcManagerTransport;
@@ -415,7 +429,8 @@ mod tests {
#[tokio::test]
async fn peer_rpc_basic_test() {
struct MockTransport {
tunnel: Box<dyn tunnels::Tunnel>,
sink: Arc<Mutex<Pin<Box<dyn ZCPacketSink>>>>,
stream: Arc<Mutex<Pin<Box<dyn ZCPacketStream>>>>,
my_peer_id: PeerId,
}
@@ -424,22 +439,25 @@ mod tests {
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
async fn send(&self, msg: Bytes, _dst_peer_id: PeerId) -> Result<(), Error> {
async fn send(&self, msg: ZCPacket, _dst_peer_id: PeerId) -> Result<(), Error> {
println!("rpc mgr send: {:?}", msg);
self.tunnel.pin_sink().send(msg).await.unwrap();
self.sink.lock().await.send(msg).await.unwrap();
Ok(())
}
async fn recv(&self) -> Result<Bytes, Error> {
let ret = self.tunnel.pin_stream().next().await.unwrap();
async fn recv(&self) -> Result<ZCPacket, Error> {
let ret = self.stream.lock().await.next().await.unwrap();
println!("rpc mgr recv: {:?}", ret);
return ret.map(|v| v.freeze()).map_err(|_| Error::Unknown);
return ret.map_err(|e| e.into());
}
}
let (ct, st) = create_ring_tunnel_pair();
let (cts, ctsr) = ct.split();
let (sts, stsr) = st.split();
let server_rpc_mgr = PeerRpcManager::new(MockTransport {
tunnel: st,
sink: Arc::new(Mutex::new(ctsr)),
stream: Arc::new(Mutex::new(cts)),
my_peer_id: new_peer_id(),
});
server_rpc_mgr.run();
@@ -449,7 +467,8 @@ mod tests {
server_rpc_mgr.run_service(1, s.serve());
let client_rpc_mgr = PeerRpcManager::new(MockTransport {
tunnel: ct,
sink: Arc::new(Mutex::new(stsr)),
stream: Arc::new(Mutex::new(sts)),
my_peer_id: new_peer_id(),
});
client_rpc_mgr.run();
+1 -1
View File
@@ -4,7 +4,7 @@ use futures::Future;
use crate::{
common::{error::Error, global_ctx::tests::get_mock_global_ctx, PeerId},
tunnels::ring_tunnel::create_ring_tunnel_pair,
tunnel::ring::create_ring_tunnel_pair,
};
use super::peer_manager::{PeerManager, RouteAlgoType};
+748
View File
@@ -0,0 +1,748 @@
use std::{
any::Any,
fmt::Debug,
pin::Pin,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use futures::{SinkExt, StreamExt, TryFutureExt};
use prost::Message;
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
time::{timeout, Duration},
};
use tokio_util::sync::PollSender;
use tracing::Instrument;
use zerocopy::AsBytes;
use crate::{
common::{
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
PeerId,
},
peers::packet::PacketType,
rpc::{HandshakeRequest, PeerConnInfo, PeerConnStats, TunnelInfo},
tunnel::{
filter::{StatsRecorderTunnelFilter, TunnelFilter, TunnelWithFilter},
mpsc::{MpscTunnel, MpscTunnelSender},
packet_def::ZCPacket,
stats::{Throughput, WindowLatency},
Tunnel, TunnelError, ZCPacketStream,
},
};
use super::{peer_conn_ping::PeerConnPinger, PacketRecvChan};
pub type PeerConnId = uuid::Uuid;
const MAGIC: u32 = 0xd1e1a5e1;
const VERSION: u32 = 1;
pub struct PeerConn {
conn_id: PeerConnId,
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
tunnel: Box<dyn Any + Send + 'static>,
sink: MpscTunnelSender,
recv: Option<Pin<Box<dyn ZCPacketStream>>>,
tunnel_info: Option<TunnelInfo>,
tasks: JoinSet<Result<(), TunnelError>>,
info: Option<HandshakeRequest>,
close_event_sender: Option<mpsc::Sender<PeerConnId>>,
ctrl_resp_sender: broadcast::Sender<ZCPacket>,
latency_stats: Arc<WindowLatency>,
throughput: Arc<Throughput>,
loss_rate_stats: Arc<AtomicU32>,
}
impl Debug for PeerConn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerConn")
.field("conn_id", &self.conn_id)
.field("my_peer_id", &self.my_peer_id)
.field("info", &self.info)
.finish()
}
}
impl PeerConn {
pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box<dyn Tunnel>) -> Self {
let tunnel_info = tunnel.info();
let (ctrl_sender, _ctrl_receiver) = broadcast::channel(100);
let peer_conn_tunnel_filter = StatsRecorderTunnelFilter::new();
let throughput = peer_conn_tunnel_filter.filter_output();
let peer_conn_tunnel = TunnelWithFilter::new(tunnel, peer_conn_tunnel_filter);
let mut mpsc_tunnel = MpscTunnel::new(peer_conn_tunnel);
let (recv, sink) = (mpsc_tunnel.get_stream(), mpsc_tunnel.get_sink());
PeerConn {
conn_id: PeerConnId::new_v4(),
my_peer_id,
global_ctx,
tunnel: Box::new(mpsc_tunnel),
sink,
recv: Some(recv),
tunnel_info,
tasks: JoinSet::new(),
info: None,
close_event_sender: None,
ctrl_resp_sender: ctrl_sender,
latency_stats: Arc::new(WindowLatency::new(15)),
throughput,
loss_rate_stats: Arc::new(AtomicU32::new(0)),
}
}
pub fn get_conn_id(&self) -> PeerConnId {
self.conn_id
}
async fn wait_handshake(&mut self) -> Result<HandshakeRequest, Error> {
let recv = self.recv.as_mut().unwrap();
let Some(rsp) = recv.next().await else {
return Err(Error::WaitRespError(
"conn closed during wait handshake response".to_owned(),
));
};
let rsp = rsp?;
let rsp = HandshakeRequest::decode(rsp.payload())
.map_err(|e| Error::WaitRespError(format!("decode handshake response error: {:?}", e)));
return Ok(rsp.unwrap());
}
async fn wait_handshake_loop(&mut self) -> Result<HandshakeRequest, Error> {
Ok(timeout(Duration::from_secs(5), async move {
loop {
match self.wait_handshake().await {
Ok(rsp) => return rsp,
Err(e) => {
log::warn!("wait handshake error: {:?}", e);
}
}
}
})
.map_err(|e| Error::WaitRespError(format!("wait handshake timeout: {:?}", e)))
.await?)
}
async fn send_handshake(&mut self) -> Result<(), Error> {
let network = self.global_ctx.get_network_identity();
let req = HandshakeRequest {
magic: MAGIC,
my_peer_id: self.my_peer_id,
version: VERSION,
features: Vec::new(),
network_name: network.network_name.clone(),
network_secret: network.network_secret.clone(),
};
let hs_req = req.encode_to_vec();
let mut zc_packet = ZCPacket::new_with_payload(hs_req.as_bytes());
zc_packet.fill_peer_manager_hdr(
self.my_peer_id,
PeerId::default(),
PacketType::HandShake as u8,
);
self.sink.send(zc_packet).await.map_err(|e| {
tracing::warn!("send handshake request error: {:?}", e);
Error::WaitRespError("send handshake request error".to_owned())
})?;
Ok(())
}
#[tracing::instrument]
pub async fn do_handshake_as_server(&mut self) -> Result<(), Error> {
let rsp = self.wait_handshake_loop().await?;
tracing::info!("handshake request: {:?}", rsp);
self.info = Some(rsp);
self.send_handshake().await?;
Ok(())
}
#[tracing::instrument]
pub async fn do_handshake_as_client(&mut self) -> Result<(), Error> {
self.send_handshake().await?;
tracing::info!("waiting for handshake request from server");
let rsp = self.wait_handshake_loop().await?;
tracing::info!("handshake response: {:?}", rsp);
self.info = Some(rsp);
Ok(())
}
pub fn handshake_done(&self) -> bool {
self.info.is_some()
}
pub fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
let mut stream = self.recv.take().unwrap();
let sink = self.sink.clone();
let mut sender = PollSender::new(packet_recv_chan.clone());
let close_event_sender = self.close_event_sender.clone().unwrap();
let conn_id = self.conn_id;
let ctrl_sender = self.ctrl_resp_sender.clone();
let _conn_info = self.get_conn_info();
let conn_info_for_instrument = self.get_conn_info();
self.tasks.spawn(
async move {
tracing::info!("start recving peer conn packet");
let mut task_ret = Ok(());
while let Some(ret) = stream.next().await {
if ret.is_err() {
tracing::error!(error = ?ret, "peer conn recv error");
task_ret = Err(ret.err().unwrap());
break;
}
let mut zc_packet = ret.unwrap();
let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else {
tracing::error!(
"unexpected packet: {:?}, cannot decode peer manager hdr",
zc_packet
);
continue;
};
if peer_mgr_hdr.packet_type == PacketType::Ping as u8 {
peer_mgr_hdr.packet_type = PacketType::Pong as u8;
if let Err(e) = sink.send(zc_packet).await {
tracing::error!(?e, "peer conn send req error");
}
} else if peer_mgr_hdr.packet_type == PacketType::Pong as u8 {
if let Err(e) = ctrl_sender.send(zc_packet) {
tracing::error!(?e, "peer conn send ctrl resp error");
}
} else {
if sender.send(zc_packet).await.is_err() {
break;
}
}
}
tracing::info!("end recving peer conn packet");
drop(sink);
if let Err(e) = close_event_sender.send(conn_id).await {
tracing::error!(error = ?e, "peer conn close event send error");
}
task_ret
}
.instrument(
tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument),
),
);
}
pub fn start_pingpong(&mut self) {
let mut pingpong = PeerConnPinger::new(
self.my_peer_id,
self.get_peer_id(),
self.sink.clone(),
self.ctrl_resp_sender.clone(),
self.latency_stats.clone(),
self.loss_rate_stats.clone(),
);
let close_event_sender = self.close_event_sender.clone().unwrap();
let conn_id = self.conn_id;
self.tasks.spawn(async move {
pingpong.pingpong().await;
tracing::warn!(?pingpong, "pingpong task exit");
if let Err(e) = close_event_sender.send(conn_id).await {
log::warn!("close event sender error: {:?}", e);
}
Ok(())
});
}
pub async fn send_msg(&mut self, msg: ZCPacket) -> Result<(), Error> {
Ok(self.sink.send(msg).await?)
}
pub fn get_peer_id(&self) -> PeerId {
self.info.as_ref().unwrap().my_peer_id
}
pub fn get_network_identity(&self) -> NetworkIdentity {
let info = self.info.as_ref().unwrap();
NetworkIdentity {
network_name: info.network_name.clone(),
network_secret: info.network_secret.clone(),
}
}
pub fn set_close_event_sender(&mut self, sender: mpsc::Sender<PeerConnId>) {
self.close_event_sender = Some(sender);
}
pub fn get_stats(&self) -> PeerConnStats {
PeerConnStats {
latency_us: self.latency_stats.get_latency_us(),
tx_bytes: self.throughput.tx_bytes(),
rx_bytes: self.throughput.rx_bytes(),
tx_packets: self.throughput.tx_packets(),
rx_packets: self.throughput.rx_packets(),
}
}
pub fn get_conn_info(&self) -> PeerConnInfo {
PeerConnInfo {
conn_id: self.conn_id.to_string(),
my_peer_id: self.my_peer_id,
peer_id: self.get_peer_id(),
features: self.info.as_ref().unwrap().features.clone(),
tunnel: self.tunnel_info.clone(),
stats: Some(self.get_stats()),
loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::common::global_ctx::tests::get_mock_global_ctx;
use crate::common::new_peer_id;
use crate::tunnel::filter::tests::DropSendTunnelFilter;
use crate::tunnel::filter::PacketRecorderTunnelFilter;
use crate::tunnel::ring::create_ring_tunnel_pair;
#[tokio::test]
async fn peer_conn_handshake() {
let (c, s) = create_ring_tunnel_pair();
let c_recorder = Arc::new(PacketRecorderTunnelFilter::new());
let s_recorder = Arc::new(PacketRecorderTunnelFilter::new());
let c = TunnelWithFilter::new(c, c_recorder.clone());
let s = TunnelWithFilter::new(s, s_recorder.clone());
let c_peer_id = new_peer_id();
let s_peer_id = new_peer_id();
let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c));
let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s));
let (c_ret, s_ret) = tokio::join!(
c_peer.do_handshake_as_client(),
s_peer.do_handshake_as_server()
);
c_ret.unwrap();
s_ret.unwrap();
assert_eq!(c_recorder.sent.lock().unwrap().len(), 1);
assert_eq!(c_recorder.received.lock().unwrap().len(), 1);
assert_eq!(s_recorder.sent.lock().unwrap().len(), 1);
assert_eq!(s_recorder.received.lock().unwrap().len(), 1);
assert_eq!(c_peer.get_peer_id(), s_peer_id);
assert_eq!(s_peer.get_peer_id(), c_peer_id);
assert_eq!(c_peer.get_network_identity(), s_peer.get_network_identity());
assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default());
}
async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) {
let (c, s) = create_ring_tunnel_pair();
// drop 1-3 packets should not affect pingpong
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
let c = TunnelWithFilter::new(c, c_recorder.clone());
let c_peer_id = new_peer_id();
let s_peer_id = new_peer_id();
let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c));
let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s));
let (c_ret, s_ret) = tokio::join!(
c_peer.do_handshake_as_client(),
s_peer.do_handshake_as_server()
);
s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0);
s_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0);
assert!(c_ret.is_ok());
assert!(s_ret.is_ok());
let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1);
c_peer.set_close_event_sender(close_send);
c_peer.start_pingpong();
c_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0);
// wait 5s, conn should not be disconnected
tokio::time::sleep(Duration::from_secs(15)).await;
if conn_closed {
assert!(close_recv.try_recv().is_ok());
} else {
assert!(close_recv.try_recv().is_err());
}
}
#[tokio::test]
async fn peer_conn_pingpong_timeout() {
peer_conn_pingpong_test_common(3, 5, false).await;
peer_conn_pingpong_test_common(5, 12, true).await;
}
}
/*
use std::{
fmt::Debug,
pin::Pin,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use futures::{SinkExt, StreamExt};
use pnet::datalink::NetworkInterface;
use tokio::{
sync::{broadcast, mpsc, Mutex},
task::JoinSet,
time::{timeout, Duration},
};
use tokio_util::{bytes::Bytes, sync::PollSender};
use tracing::Instrument;
use crate::{
common::{
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
PeerId,
},
define_tunnel_filter_chain,
peers::packet::{ArchivedPacketType, CtrlPacketPayload, PacketType},
rpc::{PeerConnInfo, PeerConnStats},
tunnel::{mpsc::MpscTunnelSender, stats::WindowLatency, TunnelError},
};
use super::packet::{self, HandShake, Packet};
pub type PacketRecvChan = mpsc::Sender<Bytes>;
macro_rules! wait_response {
($stream: ident, $out_var:ident, $pattern:pat_param => $value:expr) => {
let Ok(rsp_vec) = timeout(Duration::from_secs(1), $stream.next()).await else {
return Err(Error::WaitRespError(
"wait handshake response timeout".to_owned(),
));
};
let Some(rsp_vec) = rsp_vec else {
return Err(Error::WaitRespError(
"wait handshake response get none".to_owned(),
));
};
let Ok(rsp_vec) = rsp_vec else {
return Err(Error::WaitRespError(format!(
"wait handshake response get error {}",
rsp_vec.err().unwrap()
)));
};
let $out_var;
let rsp_bytes = Packet::decode(&rsp_vec);
if rsp_bytes.packet_type != PacketType::HandShake {
tracing::error!("unexpected packet type: {:?}", rsp_bytes);
return Err(Error::WaitRespError("unexpected packet type".to_owned()));
}
let resp_payload = CtrlPacketPayload::from_packet(&rsp_bytes);
match &resp_payload {
$pattern => $out_var = $value,
_ => {
tracing::error!(
"unexpected packet: {:?}, pattern: {:?}",
rsp_bytes,
stringify!($pattern)
);
return Err(Error::WaitRespError("unexpected packet".to_owned()));
}
}
};
}
impl<'a> From<&HandShake> for PeerInfo {
fn from(hs: &HandShake) -> Self {
PeerInfo {
magic: hs.magic.into(),
my_peer_id: hs.my_peer_id.into(),
version: hs.version.into(),
features: hs.features.iter().map(|x| x.to_string()).collect(),
interfaces: Vec::new(),
network_identity: hs.network_identity.clone(),
}
}
}
define_tunnel_filter_chain!(PeerConnTunnel, stats = StatsRecorderTunnelFilter);
pub struct PeerConn {
conn_id: PeerConnId,
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
sink: Pin<Box<dyn DatagramSink>>,
tunnel: Box<dyn Tunnel>,
tasks: JoinSet<Result<(), TunnelError>>,
info: Option<PeerInfo>,
close_event_sender: Option<mpsc::Sender<PeerConnId>>,
ctrl_resp_sender: broadcast::Sender<Bytes>,
latency_stats: Arc<WindowLatency>,
throughput: Arc<Throughput>,
loss_rate_stats: Arc<AtomicU32>,
}
enum PeerConnPacketType {
Data(Bytes),
CtrlReq(Bytes),
CtrlResp(Bytes),
}
impl PeerConn {
pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box<dyn Tunnel>) -> Self {
let (ctrl_sender, _ctrl_receiver) = broadcast::channel(100);
let peer_conn_tunnel = PeerConnTunnel::new();
let tunnel = peer_conn_tunnel.wrap_tunnel(tunnel);
PeerConn {
conn_id: PeerConnId::new_v4(),
my_peer_id,
global_ctx,
sink: tunnel.pin_sink(),
tunnel: Box::new(tunnel),
tasks: JoinSet::new(),
info: None,
close_event_sender: None,
ctrl_resp_sender: ctrl_sender,
latency_stats: Arc::new(WindowLatency::new(15)),
throughput: peer_conn_tunnel.stats.get_throughput().clone(),
loss_rate_stats: Arc::new(AtomicU32::new(0)),
}
}
pub fn get_conn_id(&self) -> PeerConnId {
self.conn_id
}
#[tracing::instrument]
pub async fn do_handshake_as_server(&mut self) -> Result<(), TunnelError> {
let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink();
tracing::info!("waiting for handshake request from client");
wait_response!(stream, hs_req, CtrlPacketPayload::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_req));
tracing::info!("handshake request: {:?}", hs_req);
let hs_req = self
.global_ctx
.net_ns
.run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network));
sink.send(hs_req.into()).await?;
Ok(())
}
#[tracing::instrument]
pub async fn do_handshake_as_client(&mut self) -> Result<(), TunnelError> {
let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink();
let hs_req = self
.global_ctx
.net_ns
.run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network));
sink.send(hs_req.into()).await?;
tracing::info!("waiting for handshake request from server");
wait_response!(stream, hs_rsp, CtrlPacketPayload::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_rsp));
tracing::info!("handshake response: {:?}", hs_rsp);
Ok(())
}
pub fn handshake_done(&self) -> bool {
self.info.is_some()
}
pub fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink();
let mut sender = PollSender::new(packet_recv_chan.clone());
let close_event_sender = self.close_event_sender.clone().unwrap();
let conn_id = self.conn_id;
let ctrl_sender = self.ctrl_resp_sender.clone();
let conn_info = self.get_conn_info();
let conn_info_for_instrument = self.get_conn_info();
self.tasks.spawn(
async move {
tracing::info!("start recving peer conn packet");
let mut task_ret = Ok(());
while let Some(ret) = stream.next().await {
if ret.is_err() {
tracing::error!(error = ?ret, "peer conn recv error");
task_ret = Err(ret.err().unwrap());
break;
}
let buf = ret.unwrap();
let p = Packet::decode(&buf);
match p.packet_type {
ArchivedPacketType::Ping => {
let CtrlPacketPayload::Ping(seq) = CtrlPacketPayload::from_packet(p)
else {
log::error!("unexpected packet: {:?}", p);
continue;
};
let pong = packet::Packet::new_pong_packet(
conn_info.my_peer_id,
conn_info.peer_id,
seq.into(),
);
if let Err(e) = sink.send(pong.into()).await {
tracing::error!(?e, "peer conn send req error");
}
}
ArchivedPacketType::Pong => {
if let Err(e) = ctrl_sender.send(buf.into()) {
tracing::error!(?e, "peer conn send ctrl resp error");
}
}
_ => {
if sender.send(buf.into()).await.is_err() {
break;
}
}
}
}
tracing::info!("end recving peer conn packet");
if let Err(close_ret) = sink.close().await {
tracing::error!(error = ?close_ret, "peer conn sink close error, ignore it");
}
if let Err(e) = close_event_sender.send(conn_id).await {
tracing::error!(error = ?e, "peer conn close event send error");
}
task_ret
}
.instrument(
tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument),
),
);
}
pub async fn send_msg(&mut self, msg: Bytes) -> Result<(), Error> {
self.sink.send(msg).await
}
pub fn get_peer_id(&self) -> PeerId {
self.info.as_ref().unwrap().my_peer_id
}
pub fn get_network_identity(&self) -> NetworkIdentity {
self.info.as_ref().unwrap().network_identity.clone()
}
pub fn set_close_event_sender(&mut self, sender: mpsc::Sender<PeerConnId>) {
self.close_event_sender = Some(sender);
}
pub fn get_stats(&self) -> PeerConnStats {
PeerConnStats {
latency_us: self.latency_stats.get_latency_us(),
tx_bytes: self.throughput.tx_bytes(),
rx_bytes: self.throughput.rx_bytes(),
tx_packets: self.throughput.tx_packets(),
rx_packets: self.throughput.rx_packets(),
}
}
pub fn get_conn_info(&self) -> PeerConnInfo {
PeerConnInfo {
conn_id: self.conn_id.to_string(),
my_peer_id: self.my_peer_id,
peer_id: self.get_peer_id(),
features: self.info.as_ref().unwrap().features.clone(),
tunnel: self.tunnel.info(),
stats: Some(self.get_stats()),
loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32,
}
}
}
impl Drop for PeerConn {
fn drop(&mut self) {
let mut sink = self.tunnel.pin_sink();
tokio::spawn(async move {
let ret = sink.close().await;
tracing::info!(error = ?ret, "peer conn tunnel closed.");
});
log::info!("peer conn {:?} drop", self.conn_id);
}
}
}
*/