feat: relay peer end-to-end encryption via Noise IK handshake (#1960)

Enable encryption for non-direct nodes requiring relay forwarding.
When secure_mode is enabled, peers perform Noise IK handshake to
establish an encrypted PeerSession. Relay packets are encrypted at
the sender and decrypted at the receiver. Intermediate forwarding
nodes cannot read plaintext data.

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: KKRainbow <5665404+KKRainbow@users.noreply.github.com>
This commit is contained in:
KKRainbow
2026-03-07 14:47:22 +08:00
committed by GitHub
parent 22b4c4be2c
commit 59d4475743
14 changed files with 2081 additions and 73 deletions
+45 -1
View File
@@ -47,7 +47,9 @@ use super::{
peer_ospf_route::PeerRoute,
peer_rpc::{PeerRpcManager, PeerRpcManagerTransport},
peer_rpc_service::DirectConnectorManagerRpcServer,
peer_session::PeerSessionStore,
recv_packet_from_chan,
relay_peer_map::RelayPeerMap,
route_trait::NextHopPolicy,
PacketRecvChan, PacketRecvChanReceiver, PUBLIC_SERVER_HOSTNAME_PREFIX,
};
@@ -64,6 +66,8 @@ struct ForeignNetworkEntry {
global_ctx: ArcGlobalCtx,
network: NetworkIdentity,
peer_map: Arc<PeerMap>,
relay_peer_map: Arc<RelayPeerMap>,
peer_session_store: Arc<PeerSessionStore>,
relay_data: bool,
pm_packet_sender: Mutex<Option<PacketRecvChan>>,
@@ -103,6 +107,13 @@ impl ForeignNetworkEntry {
foreign_global_ctx.clone(),
my_peer_id,
));
let peer_session_store = Arc::new(PeerSessionStore::new());
let relay_peer_map = RelayPeerMap::new(
peer_map.clone(),
foreign_global_ctx.clone(),
my_peer_id,
peer_session_store.clone(),
);
let (peer_rpc, rpc_transport_sender) = Self::build_rpc_tspt(my_peer_id, peer_map.clone());
@@ -136,6 +147,8 @@ impl ForeignNetworkEntry {
global_ctx: foreign_global_ctx,
network,
peer_map,
relay_peer_map,
peer_session_store,
relay_data,
pm_packet_sender: Mutex::new(Some(pm_packet_sender)),
@@ -314,6 +327,7 @@ impl ForeignNetworkEntry {
let my_node_id = self.my_peer_id;
let rpc_sender = self.rpc_sender.clone();
let peer_map = self.peer_map.clone();
let relay_peer_map = self.relay_peer_map.clone();
let relay_data = self.relay_data;
let pm_sender = self.pm_packet_sender.lock().await.take().unwrap();
let network_name = self.network.network_name.clone();
@@ -344,6 +358,12 @@ impl ForeignNetworkEntry {
tracing::trace!(?hdr, "recv packet in foreign network manager");
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id == my_node_id {
if hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
{
let _ = relay_peer_map.handle_handshake_packet(zc_packet).await;
continue;
}
if hdr.packet_type == PacketType::TaRpc as u8
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
@@ -358,6 +378,8 @@ impl ForeignNetworkEntry {
if hdr.packet_type == PacketType::Data as u8
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8
|| hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
{
if !relay_data {
continue;
@@ -405,9 +427,31 @@ impl ForeignNetworkEntry {
});
}
async fn run_relay_session_gc_routine(&self) {
let relay_peer_map = self.relay_peer_map.clone();
self.tasks.lock().await.spawn(async move {
loop {
relay_peer_map.evict_idle_sessions(std::time::Duration::from_secs(60));
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
});
}
async fn run_peer_session_gc_routine(&self) {
let peer_session_store = self.peer_session_store.clone();
self.tasks.lock().await.spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
peer_session_store.evict_unused_sessions();
}
});
}
async fn prepare(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
self.prepare_route(accessor).await;
self.start_packet_recv().await;
self.run_relay_session_gc_routine().await;
self.run_peer_session_gc_routine().await;
self.peer_rpc.run();
self.peer_center.init().await;
}
@@ -734,7 +778,7 @@ impl ForeignNetworkManager {
) -> Result<(), Error> {
if let Some(entry) = self.data.get_network_entry(network_name) {
entry
.peer_map
.relay_peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
} else {
+1
View File
@@ -10,6 +10,7 @@ pub mod peer_ospf_route;
pub mod peer_rpc;
pub mod peer_rpc_service;
pub mod peer_session;
pub mod relay_peer_map;
pub mod route_trait;
pub mod rpc_service;
+56 -5
View File
@@ -138,6 +138,8 @@ impl PeerSessionTunnelFilter {
hdr.packet_type == PacketType::NoiseHandshakeMsg1 as u8
|| hdr.packet_type == PacketType::NoiseHandshakeMsg2 as u8
|| hdr.packet_type == PacketType::NoiseHandshakeMsg3 as u8
|| hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
|| hdr.packet_type == PacketType::Ping as u8
|| hdr.packet_type == PacketType::Pong as u8
}
@@ -169,9 +171,19 @@ impl TunnelFilter for PeerSessionTunnelFilter {
};
let my_peer_id = self.my_peer_id.load();
session
.encrypt_payload(my_peer_id, peer_id, &mut data)
.ok()?;
if my_peer_id != hdr.from_peer_id.get() {
return Some(data);
}
if let Err(e) = session.encrypt_payload(my_peer_id, peer_id, &mut data) {
tracing::warn!(
?my_peer_id,
?peer_id,
?e,
"PeerSessionTunnelFilter: encrypt failed, dropping packet"
);
return None;
}
Some(data)
}
@@ -198,7 +210,14 @@ impl TunnelFilter for PeerSessionTunnelFilter {
if from_peer_id == 0 {
return Some(Ok(data));
}
self.peer_id.store(Some(from_peer_id));
let Some(peer_id) = self.peer_id.load() else {
return Some(Ok(data));
};
if from_peer_id != peer_id {
return Some(Ok(data));
}
let mut guard = self.session.lock().unwrap();
let Some(session) = guard.as_mut() else {
@@ -206,7 +225,22 @@ impl TunnelFilter for PeerSessionTunnelFilter {
};
let my_peer_id = self.my_peer_id.load();
let _ = session.decrypt_payload(from_peer_id, my_peer_id, &mut data);
if hdr.to_peer_id.get() != my_peer_id {
return Some(Ok(data));
}
if let Err(e) = session.decrypt_payload(from_peer_id, my_peer_id, &mut data) {
if !session.is_valid() {
// Session auto-invalidated after too many consecutive failures.
// Close the connection to trigger reconnection with a fresh handshake.
tracing::error!(?e, "session invalidated, closing connection");
return Some(Err(TunnelError::InternalError(
"session invalidated due to consecutive decrypt failures".to_string(),
)));
}
// Transient failure, drop this packet but keep the connection alive.
return None;
}
Some(Ok(data))
}
@@ -775,6 +809,13 @@ impl PeerConn {
.get_remote_static()
.map(|x: &[u8]| x.to_vec())
.unwrap_or_default();
let remote_static_key = if remote_static.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&remote_static);
Some(key)
} else {
None
};
if let Some(pinned) = pinned_remote_pubkey.as_ref() {
if pinned.as_slice() == remote_static.as_slice() {
@@ -812,6 +853,7 @@ impl PeerConn {
msg2_pb.initial_epoch,
algo,
msg2_pb.server_encryption_algorithm.clone(),
remote_static_key,
)?;
Ok(NoiseHandshakeResult {
@@ -949,6 +991,7 @@ impl PeerConn {
msg1_pb.a_session_generation,
algo.clone(),
msg1_pb.client_encryption_algorithm.clone(),
None,
)?;
let b_conn_id = uuid::Uuid::new_v4();
@@ -1022,6 +1065,14 @@ impl PeerConn {
.get_remote_static()
.map(|x: &[u8]| x.to_vec())
.unwrap_or_default();
let remote_static_key = if remote_static.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&remote_static);
Some(key)
} else {
None
};
session.check_or_set_peer_static_pubkey(remote_static_key)?;
let handshake_hash = hs.get_handshake_hash().to_vec();
+132 -19
View File
@@ -62,6 +62,7 @@ use super::{
peer_map::PeerMap,
peer_ospf_route::PeerRoute,
peer_rpc::PeerRpcManager,
relay_peer_map::RelayPeerMap,
route_trait::{ArcRoute, Route},
BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver,
};
@@ -76,6 +77,7 @@ struct RpcTransport {
peer_rpc_tspt_sender: UnboundedSender<ZCPacket>,
encryptor: Arc<dyn Encryptor>,
is_secure_mode_enabled: bool,
}
#[async_trait::async_trait]
@@ -93,7 +95,7 @@ impl PeerRpcManagerTransport for RpcTransport {
.and_then(|x| x.feature_flag.map(|x| x.is_public_server))
// if dst is directly connected, it's must not public server
.unwrap_or(!peers.has_peer(dst_peer_id));
if !is_dst_peer_public_server {
if !is_dst_peer_public_server && !self.is_secure_mode_enabled {
self.encryptor
.encrypt(&mut msg)
.with_context(|| "encrypt failed")?;
@@ -150,6 +152,7 @@ pub struct PeerManager {
foreign_network_manager: Arc<ForeignNetworkManager>,
foreign_network_client: Arc<ForeignNetworkClient>,
relay_peer_map: Arc<RelayPeerMap>,
encryptor: Arc<dyn Encryptor + 'static>,
data_compress_algo: CompressorAlgo,
@@ -163,6 +166,7 @@ pub struct PeerManager {
self_tx_counters: SelfTxCounters,
peer_session_store: Arc<PeerSessionStore>,
is_secure_mode_enabled: bool,
}
impl Debug for PeerManager {
@@ -189,6 +193,13 @@ impl PeerManager {
global_ctx.clone(),
my_peer_id,
));
let peer_session_store = Arc::new(PeerSessionStore::new());
let relay_peer_map = RelayPeerMap::new(
peers.clone(),
global_ctx.clone(),
my_peer_id,
peer_session_store.clone(),
);
let encryptor = if global_ctx.get_flags().enable_encryption {
// 只有在启用加密时才使用工厂函数选择算法
@@ -213,6 +224,12 @@ impl PeerManager {
global_ctx.set_feature_flags(f);
}
let is_secure_mode_enabled = global_ctx
.config
.get_secure_mode()
.map(|cfg| cfg.enabled)
.unwrap_or(false);
// TODO: remove these because we have impl pipeline processor.
let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let rpc_tspt = Arc::new(RpcTransport {
@@ -222,6 +239,7 @@ impl PeerManager {
packet_recv: Mutex::new(peer_rpc_tspt_recv),
peer_rpc_tspt_sender,
encryptor: encryptor.clone(),
is_secure_mode_enabled,
});
let peer_rpc_mgr = Arc::new(PeerRpcManager::new_with_stats_manager(
rpc_tspt.clone(),
@@ -304,6 +322,7 @@ impl PeerManager {
foreign_network_manager,
foreign_network_client,
relay_peer_map,
encryptor,
data_compress_algo,
@@ -316,7 +335,8 @@ impl PeerManager {
self_tx_counters,
peer_session_store: Arc::new(PeerSessionStore::new()),
peer_session_store,
is_secure_mode_enabled,
}
}
@@ -645,11 +665,13 @@ impl PeerManager {
let peers = self.peers.clone();
let pipe_line = self.peer_packet_process_pipeline.clone();
let foreign_client = self.foreign_network_client.clone();
let relay_peer_map = self.relay_peer_map.clone();
let foreign_mgr = self.foreign_network_manager.clone();
let encryptor = self.encryptor.clone();
let compress_algo = self.data_compress_algo;
let acl_filter = self.global_ctx.get_acl_filter().clone();
let global_ctx = self.global_ctx.clone();
let secure_mode_enabled = self.is_secure_mode_enabled;
let stats_mgr = self.global_ctx.stats_manager().clone();
let route = self.get_route();
@@ -713,9 +735,13 @@ impl PeerManager {
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8
{
let _ =
Self::try_compress_and_encrypt(compress_algo, &encryptor, &mut ret)
.await;
let _ = Self::try_compress_and_encrypt(
compress_algo,
&encryptor,
&mut ret,
secure_mode_enabled,
)
.await;
}
compress_tx_bytes_after.add(ret.buf_len() as u64);
@@ -727,16 +753,42 @@ impl PeerManager {
}
tracing::trace!(?to_peer_id, ?my_peer_id, "need forward");
let ret =
Self::send_msg_internal(&peers, &foreign_client, ret, to_peer_id).await;
let ret = Self::send_msg_internal(
&peers,
&foreign_client,
&relay_peer_map,
ret,
to_peer_id,
)
.await;
if ret.is_err() {
tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error");
}
} else {
if let Err(e) = encryptor.decrypt(&mut ret) {
tracing::error!(?e, "decrypt failed");
if hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
{
let _ = relay_peer_map.handle_handshake_packet(ret).await;
continue;
}
if !secure_mode_enabled {
if let Err(e) = encryptor.decrypt(&mut ret) {
tracing::error!(?e, "decrypt failed");
continue;
}
} else if !peers.has_peer(from_peer_id) {
match relay_peer_map.decrypt_if_needed(&mut ret) {
Ok(true) => {}
Ok(false) => {
tracing::error!("relay session not found");
continue;
}
Err(e) => {
tracing::error!(?e, "relay decrypt failed");
continue;
}
}
}
self_rx_bytes.add(buf_len as u64);
self_rx_packets.inc();
@@ -1033,16 +1085,27 @@ impl PeerManager {
.compress_tx_bytes_before
.add(msg.buf_len() as u64);
Self::try_compress_and_encrypt(self.data_compress_algo, &self.encryptor, &mut msg).await?;
Self::try_compress_and_encrypt(
self.data_compress_algo,
&self.encryptor,
&mut msg,
self.is_secure_mode_enabled,
)
.await?;
self.self_tx_counters
.compress_tx_bytes_after
.add(msg.buf_len() as u64);
let msg_len = msg.buf_len() as u64;
let result =
Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, dst_peer_id)
.await;
let result = Self::send_msg_internal(
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
msg,
dst_peer_id,
)
.await;
if result.is_ok() {
self.self_tx_counters.self_tx_bytes.add(msg_len);
self.self_tx_counters.self_tx_packets.inc();
@@ -1053,15 +1116,20 @@ impl PeerManager {
async fn send_msg_internal(
peers: &Arc<PeerMap>,
foreign_network_client: &Arc<ForeignNetworkClient>,
relay_peer_map: &Arc<RelayPeerMap>,
msg: ZCPacket,
dst_peer_id: PeerId,
) -> Result<(), Error> {
let policy =
Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first());
if peers.has_peer(dst_peer_id) {
return peers.send_msg_directly(msg, dst_peer_id).await;
}
if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await {
if peers.has_peer(gateway) {
peers.send_msg_directly(msg, gateway).await
relay_peer_map.send_msg(msg, dst_peer_id, policy).await
} else if foreign_network_client.has_next_hop(gateway) {
foreign_network_client.send_msg(msg, gateway).await
} else {
@@ -1174,13 +1242,16 @@ impl PeerManager {
compress_algo: CompressorAlgo,
encryptor: &Arc<dyn Encryptor + 'static>,
msg: &mut ZCPacket,
secure_mode_enabled: bool,
) -> Result<(), Error> {
let compressor = DefaultCompressor {};
compressor
.compress(msg, compress_algo)
.await
.with_context(|| "compress failed")?;
encryptor.encrypt(msg).with_context(|| "encrypt failed")?;
if !secure_mode_enabled {
encryptor.encrypt(msg).with_context(|| "encrypt failed")?;
}
Ok(())
}
@@ -1209,6 +1280,7 @@ impl PeerManager {
return Self::send_msg_internal(
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
msg,
cur_to_peer_id,
)
@@ -1229,7 +1301,13 @@ impl PeerManager {
.compress_tx_bytes_before
.add(msg.buf_len() as u64);
Self::try_compress_and_encrypt(self.data_compress_algo, &self.encryptor, &mut msg).await?;
Self::try_compress_and_encrypt(
self.data_compress_algo,
&self.encryptor,
&mut msg,
self.is_secure_mode_enabled,
)
.await?;
self.self_tx_counters
.compress_tx_bytes_after
@@ -1273,9 +1351,14 @@ impl PeerManager {
.add(msg.buf_len() as u64);
self.self_tx_counters.self_tx_packets.inc();
if let Err(e) =
Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, *peer_id)
.await
if let Err(e) = Self::send_msg_internal(
&self.peers,
&self.foreign_network_client,
&self.relay_peer_map,
msg,
*peer_id,
)
.await
{
errs.push(e);
}
@@ -1301,6 +1384,26 @@ impl PeerManager {
});
}
async fn run_relay_session_gc_routine(&self) {
let relay_peer_map = self.relay_peer_map.clone();
self.tasks.lock().await.spawn(async move {
loop {
relay_peer_map.evict_idle_sessions(std::time::Duration::from_secs(60));
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
});
}
async fn run_peer_session_gc_routine(&self) {
let peer_session_store = self.peer_session_store.clone();
self.tasks.lock().await.spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
peer_session_store.evict_unused_sessions();
}
});
}
async fn run_foriegn_network(&self) {
self.peer_rpc_tspt
.foreign_peers
@@ -1322,6 +1425,8 @@ impl PeerManager {
self.start_peer_recv().await;
self.run_clean_peer_without_conn_routine().await;
self.run_relay_session_gc_routine().await;
self.run_peer_session_gc_routine().await;
self.run_foriegn_network().await;
@@ -1332,10 +1437,18 @@ impl PeerManager {
self.peers.clone()
}
pub fn get_relay_peer_map(&self) -> Arc<RelayPeerMap> {
self.relay_peer_map.clone()
}
pub fn get_peer_rpc_mgr(&self) -> Arc<PeerRpcManager> {
self.peer_rpc_mgr.clone()
}
pub fn get_peer_session_store(&self) -> Arc<PeerSessionStore> {
self.peer_session_store.clone()
}
pub fn my_node_id(&self) -> uuid::Uuid {
self.global_ctx.get_id()
}
+10 -9
View File
@@ -146,6 +146,7 @@ impl RoutePeerInfo {
groups: Vec::new(),
quic_port: None,
noise_static_pubkey: Vec::new(),
}
}
@@ -164,6 +165,12 @@ impl RoutePeerInfo {
global_ctx: &ArcGlobalCtx,
) -> Self {
let stun_info = global_ctx.get_stun_info_collector().get_stun_info();
let noise_static_pubkey = global_ctx
.config
.get_secure_mode()
.and_then(|cfg| cfg.public_key().ok())
.map(|pk| pk.as_bytes().to_vec())
.unwrap_or_default();
Self {
peer_id: my_peer_id,
inst_id: Some(global_ctx.get_id().into()),
@@ -197,6 +204,8 @@ impl RoutePeerInfo {
groups: global_ctx.get_acl_groups(my_peer_id),
noise_static_pubkey,
..Default::default()
}
}
@@ -1842,15 +1851,6 @@ impl PeerRouteServiceImpl {
if let Some(last_update) = peer_info.last_update {
let last_update = TryInto::<SystemTime>::try_into(last_update).unwrap();
if last_sync_succ_timestamp.is_some_and(|t| last_update < t) {
tracing::debug!(
"ignore peer_info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, peer_infos_count: {}, my_peer_id: {:?}, session: {:?}",
peer_info,
last_update,
last_sync_succ_timestamp,
peer_infos.len(),
self.my_peer_id,
session
);
break;
}
}
@@ -2556,6 +2556,7 @@ impl RouteSessionManager {
continue;
};
session.update_initiator_flag(true);
self.sync_now("update_initiator_flag");
}
// clear sessions that are neither dst_initiator or we_are_initiator.
+105 -8
View File
@@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicU32, Ordering},
atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Mutex, RwLock,
},
time::{SystemTime, UNIX_EPOCH},
@@ -70,7 +70,29 @@ impl PeerSessionStore {
}
pub fn get(&self, key: &SessionKey) -> Option<Arc<PeerSession>> {
self.sessions.get(key).map(|v| v.clone())
let session = self.sessions.get(key)?.clone();
if session.is_valid() {
Some(session)
} else {
self.sessions.remove(key);
None
}
}
pub fn remove(&self, key: &SessionKey) {
self.sessions.remove(key);
}
pub fn insert_session(&self, key: SessionKey, session: Arc<PeerSession>) {
self.sessions.insert(key, session);
}
/// Remove sessions that are no longer referenced by any PeerConn or RelayPeerMap.
/// A session with strong_count == 1 means only the store holds it — no active
/// connection is using it, so it can be safely cleaned up.
pub fn evict_unused_sessions(&self) {
self.sessions
.retain(|_key, session| Arc::strong_count(session) > 1);
}
pub fn upsert_responder_session(
@@ -79,8 +101,13 @@ impl PeerSessionStore {
a_session_generation: Option<u32>,
send_algorithm: String,
recv_algorithm: String,
peer_static_pubkey: Option<[u8; 32]>,
) -> Result<UpsertResponderSessionReturn, anyhow::Error> {
let existing = self.sessions.get(key).map(|v| v.clone());
let existing = self
.sessions
.get(key)
.map(|v| v.clone())
.filter(|s| s.is_valid());
match existing {
None => {
let root_key = PeerSession::new_root_key();
@@ -93,6 +120,7 @@ impl PeerSessionStore {
initial_epoch,
send_algorithm,
recv_algorithm,
peer_static_pubkey,
));
self.sessions.insert(key.clone(), session.clone());
Ok(UpsertResponderSessionReturn {
@@ -105,6 +133,7 @@ impl PeerSessionStore {
}
Some(session) => {
session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?;
session.check_or_set_peer_static_pubkey(peer_static_pubkey)?;
let local_gen = session.session_generation();
if a_session_generation.is_some_and(|g| g == local_gen) {
Ok(UpsertResponderSessionReturn {
@@ -139,6 +168,7 @@ impl PeerSessionStore {
initial_epoch: u32,
send_algorithm: String,
recv_algorithm: String,
peer_static_pubkey: Option<[u8; 32]>,
) -> Result<Arc<PeerSession>, anyhow::Error> {
tracing::info!(
"apply_initiator_action {:?}, send_algorithm: {}, recv_algorithm: {}",
@@ -152,6 +182,7 @@ impl PeerSessionStore {
return Err(anyhow!("no local session for JOIN"));
};
session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?;
session.check_or_set_peer_static_pubkey(peer_static_pubkey)?;
if session.session_generation() != b_session_generation {
return Err(anyhow!("JOIN generation mismatch"));
}
@@ -159,6 +190,13 @@ impl PeerSessionStore {
}
PeerSessionAction::Sync | PeerSessionAction::Create => {
let root_key = root_key_32.ok_or_else(|| anyhow!("missing root_key"))?;
// If the existing session is invalidated, remove it so we create a fresh one
if let Some(existing) = self.sessions.get(key) {
if !existing.is_valid() {
drop(existing);
self.sessions.remove(key);
}
}
let session = self
.sessions
.entry(key.clone())
@@ -170,10 +208,12 @@ impl PeerSessionStore {
initial_epoch,
send_algorithm.clone(),
recv_algorithm.clone(),
peer_static_pubkey,
))
})
.clone();
session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?;
session.check_or_set_peer_static_pubkey(peer_static_pubkey)?;
session.sync_root_key(root_key, b_session_generation, initial_epoch);
Ok(session)
}
@@ -318,6 +358,7 @@ pub struct PeerSession {
peer_id: PeerId,
root_key: RwLock<[u8; 32]>,
session_generation: AtomicU32,
peer_static_pubkey: RwLock<Option<[u8; 32]>>,
send_epoch: AtomicU32,
send_seq: [AtomicU64; 2],
@@ -329,6 +370,12 @@ pub struct PeerSession {
send_cipher_algorithm: String,
recv_cipher_algorithm: String,
/// Set to true when the session is detected as corrupted (persistent decrypt failures).
/// Holders of Arc<PeerSession> can check this to know the session should be discarded.
invalidated: AtomicBool,
/// Consecutive decrypt failure counter. Auto-invalidates when threshold is reached.
decrypt_fail_count: AtomicU32,
}
impl std::fmt::Debug for PeerSession {
@@ -337,6 +384,7 @@ impl std::fmt::Debug for PeerSession {
.field("peer_id", &self.peer_id)
.field("root_key", &self.root_key)
.field("session_generation", &self.session_generation)
.field("peer_static_pubkey", &self.peer_static_pubkey)
.field("send_epoch", &self.send_epoch)
.field("send_seq", &self.send_seq)
.field("send_epoch_started_ms", &self.send_epoch_started_ms)
@@ -381,6 +429,7 @@ impl PeerSession {
/// stricter security requirements may decrease it.
const ROTATE_AFTER_MS: u64 = 10 * 60 * 1000;
const MAX_ACCEPTED_RX_EPOCH_AHEAD: u32 = 3;
const DECRYPT_FAIL_THRESHOLD: u32 = 10;
pub fn new(
peer_id: PeerId,
@@ -389,11 +438,8 @@ impl PeerSession {
initial_epoch: u32,
send_cipher_algorithm: String,
recv_cipher_algorithm: String,
peer_static_pubkey: Option<[u8; 32]>,
) -> Self {
// let mut root_key_128 = [0u8; 16];
// root_key_128.copy_from_slice(&root_key[..16]);
// let send_cipher = create_encryptor(&send_algorithm, root_key_128, root_key);
// let recv_cipher = create_encryptor(&recv_algorithm, root_key_128, root_key);
let rx_slots = [
[EpochRxSlot::default(), EpochRxSlot::default()],
[EpochRxSlot::default(), EpochRxSlot::default()],
@@ -407,6 +453,7 @@ impl PeerSession {
peer_id,
root_key: RwLock::new(root_key),
session_generation: AtomicU32::new(session_generation),
peer_static_pubkey: RwLock::new(peer_static_pubkey),
send_epoch: AtomicU32::new(initial_epoch),
send_seq: [AtomicU64::new(0), AtomicU64::new(0)],
send_epoch_started_ms: AtomicU64::new(now_ms),
@@ -415,6 +462,8 @@ impl PeerSession {
key_cache: Mutex::new(key_cache),
send_cipher_algorithm,
recv_cipher_algorithm,
invalidated: AtomicBool::new(false),
decrypt_fail_count: AtomicU32::new(0),
}
}
@@ -422,6 +471,15 @@ impl PeerSession {
self.peer_id
}
/// Mark this session as invalid. All holders of Arc<PeerSession> will see this.
pub fn invalidate(&self) {
self.invalidated.store(true, Ordering::Relaxed);
}
pub fn is_valid(&self) -> bool {
!self.invalidated.load(Ordering::Relaxed)
}
pub fn session_generation(&self) -> u32 {
self.session_generation.load(Ordering::Relaxed)
}
@@ -466,6 +524,24 @@ impl PeerSession {
Ok(())
}
pub fn check_or_set_peer_static_pubkey(
&self,
peer_static_pubkey: Option<[u8; 32]>,
) -> Result<(), anyhow::Error> {
let Some(peer_static_pubkey) = peer_static_pubkey else {
return Ok(());
};
let mut guard = self.peer_static_pubkey.write().unwrap();
if let Some(existing) = *guard {
if existing != peer_static_pubkey {
return Err(anyhow!("peer static pubkey mismatch"));
}
return Ok(());
}
*guard = Some(peer_static_pubkey);
Ok(())
}
pub fn sync_root_key(&self, root_key: [u8; 32], session_generation: u32, initial_epoch: u32) {
{
let mut g = self.root_key.write().unwrap();
@@ -703,6 +779,9 @@ impl PeerSession {
receiver_peer_id: PeerId,
pkt: &mut ZCPacket,
) -> Result<(), anyhow::Error> {
if !self.is_valid() {
return Err(anyhow!("session invalidated"));
}
let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id);
let (epoch, _seq, nonce_bytes) = self.next_nonce(dir);
let encryptor = self
@@ -718,6 +797,9 @@ impl PeerSession {
receiver_peer_id: PeerId,
ciphertext_with_tail: &mut ZCPacket,
) -> Result<(), anyhow::Error> {
if !self.is_valid() {
return Err(anyhow!("session invalidated"));
}
let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id);
let nonce_bytes =
Self::parse_tail(ciphertext_with_tail.payload()).ok_or_else(|| anyhow!("no tail"))?;
@@ -732,7 +814,19 @@ impl PeerSession {
let encryptor = self
.get_encryptor(epoch, dir, false)
.ok_or_else(|| anyhow!("no key for epoch"))?;
encryptor.decrypt(ciphertext_with_tail)?;
if let Err(e) = encryptor.decrypt(ciphertext_with_tail) {
let count = self.decrypt_fail_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= Self::DECRYPT_FAIL_THRESHOLD {
self.invalidate();
tracing::warn!(
peer_id = ?self.peer_id,
count,
"session auto-invalidated after consecutive decrypt failures"
);
}
return Err(e.into());
}
self.decrypt_fail_count.store(0, Ordering::Relaxed);
Ok(())
}
@@ -764,6 +858,7 @@ mod tests {
initial_epoch,
"aes-256-gcm".to_string(),
"chacha20-poly1305".to_string(),
None,
);
let sb = PeerSession::new(
a,
@@ -772,6 +867,7 @@ mod tests {
initial_epoch,
"chacha20-poly1305".to_string(),
"aes-256-gcm".to_string(),
None,
);
let plaintext1 = b"hello from a";
@@ -802,6 +898,7 @@ mod tests {
initial_epoch,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
+692
View File
@@ -0,0 +1,692 @@
use std::{sync::Arc, time::Instant};
use dashmap::DashMap;
use hmac::Mac;
use prost::Message;
use snow::params::NoiseParams;
use tokio::sync::{oneshot, Mutex, OwnedMutexGuard};
use tokio::time::{timeout, Duration};
use crate::{
common::error::Error,
common::{global_ctx::ArcGlobalCtx, PeerId},
peers::peer_map::PeerMap,
peers::peer_session::{PeerSession, PeerSessionAction, PeerSessionStore, SessionKey},
peers::route_trait::NextHopPolicy,
proto::peer_rpc::{PeerConnSessionActionPb, RelayNoiseMsg1Pb, RelayNoiseMsg2Pb},
tunnel::packet_def::{PacketType, ZCPacket},
};
const RELAY_NOISE_VERSION: u32 = 1;
const RELAY_NOISE_PROLOGUE: &[u8] = b"easytier-relay-noise";
const HANDSHAKE_TIMEOUT_SECS: u64 = 5;
const HANDSHAKE_RETRY_BASE_MS: u64 = 200;
const HANDSHAKE_MAX_ATTEMPTS: u32 = 3;
const MAX_PENDING_PACKETS_PER_PEER: usize = 32;
#[derive(Clone)]
pub struct RelayPeerState {
pub last_active_at: Instant,
pub failure_count: u32,
pub next_retry_at: Option<Instant>,
}
impl Default for RelayPeerState {
fn default() -> Self {
Self {
last_active_at: Instant::now(),
failure_count: 0,
next_retry_at: None,
}
}
}
pub struct RelayPeerMap {
peer_map: Arc<PeerMap>,
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
peer_session_store: Arc<PeerSessionStore>,
states: DashMap<PeerId, RelayPeerState>,
pending_handshakes: DashMap<PeerId, oneshot::Sender<ZCPacket>>,
handshake_locks: DashMap<PeerId, Arc<Mutex<()>>>,
pub(crate) pending_packets: DashMap<PeerId, Vec<(ZCPacket, NextHopPolicy)>>,
}
impl RelayPeerMap {
pub fn new(
peer_map: Arc<PeerMap>,
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
peer_session_store: Arc<PeerSessionStore>,
) -> Arc<Self> {
Arc::new(Self {
peer_map,
global_ctx,
my_peer_id,
peer_session_store,
states: DashMap::new(),
pending_handshakes: DashMap::new(),
handshake_locks: DashMap::new(),
pending_packets: DashMap::new(),
})
}
fn is_secure_mode_enabled(&self) -> bool {
self.global_ctx
.config
.get_secure_mode()
.map(|cfg| cfg.enabled)
.unwrap_or(false)
}
fn get_local_keypair(&self) -> Result<(Vec<u8>, Vec<u8>), Error> {
let cfg = self
.global_ctx
.config
.get_secure_mode()
.ok_or_else(|| Error::RouteError(Some("secure mode config not set".to_string())))?;
let private = cfg
.private_key()
.map_err(|e| Error::RouteError(Some(format!("invalid private key: {e:?}"))))?;
let public = cfg
.public_key()
.map_err(|e| Error::RouteError(Some(format!("invalid public key: {e:?}"))))?;
Ok((private.as_bytes().to_vec(), public.as_bytes().to_vec()))
}
async fn get_remote_static_pubkey(&self, peer_id: PeerId) -> Result<Vec<u8>, Error> {
let info = self
.peer_map
.get_route_peer_info(peer_id)
.await
.ok_or_else(|| Error::RouteError(Some("route peer info not found".to_string())))?;
if info.noise_static_pubkey.is_empty() {
return Err(Error::RouteError(Some(
"remote static pubkey not found".to_string(),
)));
}
Ok(info.noise_static_pubkey)
}
fn get_handshake_lock(&self, peer_id: PeerId) -> Arc<Mutex<()>> {
self.handshake_locks
.entry(peer_id)
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
async fn send_handshake_packet(
&self,
payload: Vec<u8>,
packet_type: PacketType,
dst_peer_id: PeerId,
policy: NextHopPolicy,
) -> Result<(), Error> {
let mut pkt = ZCPacket::new_with_payload(&payload);
pkt.fill_peer_manager_hdr(self.my_peer_id, dst_peer_id, packet_type as u8);
self.send_via_next_hop(pkt, dst_peer_id, policy).await
}
async fn send_via_next_hop(
&self,
msg: ZCPacket,
dst_peer_id: PeerId,
policy: NextHopPolicy,
) -> Result<(), Error> {
let Some(next_hop) = self.peer_map.get_gateway_peer_id(dst_peer_id, policy).await else {
return Err(Error::RouteError(None));
};
if !self.peer_map.has_peer(next_hop) {
return Err(Error::RouteError(None));
}
self.peer_map.send_msg_directly(msg, next_hop).await
}
pub async fn send_msg(
self: &Arc<Self>,
mut msg: ZCPacket,
dst_peer_id: PeerId,
policy: NextHopPolicy,
) -> Result<(), Error> {
let now = Instant::now();
self.states.entry(dst_peer_id).or_default().last_active_at = now;
if self.is_secure_mode_enabled() {
match self.ensure_session(dst_peer_id, policy.clone()).await {
Ok(session) => {
let my_peer_id = self.my_peer_id;
session
.encrypt_payload(my_peer_id, dst_peer_id, &mut msg)
.map_err(|e| Error::RouteError(Some(format!("{e:?}"))))?;
}
Err(_) => {
// Handshake in progress, buffer the packet instead of dropping it
self.buffer_pending_packet(dst_peer_id, msg, policy);
return Ok(());
}
}
}
self.send_via_next_hop(msg, dst_peer_id, policy).await
}
fn buffer_pending_packet(&self, dst_peer_id: PeerId, pkt: ZCPacket, policy: NextHopPolicy) {
let mut entry = self.pending_packets.entry(dst_peer_id).or_default();
if entry.len() < MAX_PENDING_PACKETS_PER_PEER {
entry.push((pkt, policy));
}
// silently drop when buffer is full
}
async fn flush_pending_packets(&self, dst_peer_id: PeerId, session: Arc<PeerSession>) {
let packets = self.pending_packets.remove(&dst_peer_id).map(|(_, v)| v);
let Some(packets) = packets else { return };
if packets.is_empty() {
return;
}
tracing::debug!(
?dst_peer_id,
count = packets.len(),
"flushing pending packets after relay handshake"
);
for (mut pkt, policy) in packets {
if session
.encrypt_payload(self.my_peer_id, dst_peer_id, &mut pkt)
.is_err()
{
continue;
}
let _ = self.send_via_next_hop(pkt, dst_peer_id, policy).await;
}
}
pub fn has_session(&self, dst_peer_id: PeerId) -> bool {
self.peer_session_store
.get(&SessionKey::new(
self.global_ctx.get_network_identity().network_name.clone(),
dst_peer_id,
))
.is_some()
}
pub async fn ensure_session(
self: &Arc<Self>,
dst_peer_id: PeerId,
policy: NextHopPolicy,
) -> Result<Arc<PeerSession>, Error> {
let network = self.global_ctx.get_network_identity();
let key = SessionKey::new(network.network_name.clone(), dst_peer_id);
if let Some(session) = self.peer_session_store.get(&key) {
return Ok(session);
}
let lock = self.get_handshake_lock(dst_peer_id);
if let Ok(guard) = lock.try_lock_owned() {
let self_clone = self.clone();
tokio::spawn(async move {
self_clone
.handshake_session(dst_peer_id, policy, Some(guard))
.await
});
};
Err(Error::RouteError(Some(
"relay handshake in progress".to_string(),
)))
}
#[tracing::instrument(skip(self, _lock_guard), level = "debug", ret)]
pub async fn handshake_session(
&self,
dst_peer_id: PeerId,
policy: NextHopPolicy,
_lock_guard: Option<OwnedMutexGuard<()>>,
) -> Result<(), Error> {
let network = self.global_ctx.get_network_identity();
let key = SessionKey::new(network.network_name.clone(), dst_peer_id);
if let Some(session) = self.peer_session_store.get(&key) {
self.flush_pending_packets(dst_peer_id, session).await;
return Ok(());
}
if let Some(next_retry_at) = self.states.get(&dst_peer_id).and_then(|v| v.next_retry_at) {
if Instant::now() < next_retry_at {
self.pending_packets.remove(&dst_peer_id);
return Err(Error::RouteError(Some(
"relay handshake backoff".to_string(),
)));
}
}
let mut last_err = None;
for attempt in 0..HANDSHAKE_MAX_ATTEMPTS {
let ret = self
.handshake_session_once(dst_peer_id, policy.clone())
.await;
match ret {
Ok(session) => {
self.register_handshake_success(dst_peer_id);
self.flush_pending_packets(dst_peer_id, session).await;
return Ok(());
}
Err(e) => {
last_err = Some(e);
self.register_handshake_failure(dst_peer_id, attempt);
if attempt + 1 < HANDSHAKE_MAX_ATTEMPTS {
let backoff = HANDSHAKE_RETRY_BASE_MS.saturating_mul(1 << attempt);
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
}
}
}
// All attempts failed, drop buffered packets
self.pending_packets.remove(&dst_peer_id);
Err(last_err
.unwrap_or_else(|| Error::RouteError(Some("relay handshake failed".to_string()))))
}
#[tracing::instrument(skip(self), level = "debug", ret)]
async fn handshake_session_once(
&self,
dst_peer_id: PeerId,
policy: NextHopPolicy,
) -> Result<Arc<PeerSession>, Error> {
let network = self.global_ctx.get_network_identity();
let session_key = SessionKey::new(network.network_name.clone(), dst_peer_id);
let (local_private_key, _local_public_key) = self.get_local_keypair()?;
let remote_static = self.get_remote_static_pubkey(dst_peer_id).await?;
let params: NoiseParams = "Noise_IK_25519_ChaChaPoly_SHA256"
.parse()
.map_err(|e| Error::RouteError(Some(format!("parse noise params failed: {e:?}"))))?;
let builder = snow::Builder::new(params);
let mut hs = builder
.prologue(RELAY_NOISE_PROLOGUE)
.map_err(|e| Error::RouteError(Some(format!("set prologue failed: {e:?}"))))?
.local_private_key(&local_private_key)
.map_err(|e| Error::RouteError(Some(format!("set local key failed: {e:?}"))))?
.remote_public_key(&remote_static)
.map_err(|e| Error::RouteError(Some(format!("set remote key failed: {e:?}"))))?
.build_initiator()
.map_err(|e| Error::RouteError(Some(format!("build initiator failed: {e:?}"))))?;
let a_session_generation = self
.peer_session_store
.get(&session_key)
.map(|s| s.session_generation());
let a_conn_id = uuid::Uuid::new_v4();
let msg1_pb = RelayNoiseMsg1Pb {
version: RELAY_NOISE_VERSION,
a_network_name: network.network_name.clone(),
a_session_generation,
a_conn_id: Some(a_conn_id.into()),
client_encryption_algorithm: self.global_ctx.get_flags().encryption_algorithm.clone(),
};
let payload = msg1_pb.encode_to_vec();
let mut out = vec![0u8; 4096];
let out_len = hs
.write_message(&payload, &mut out)
.map_err(|e| Error::RouteError(Some(format!("noise write msg1 failed: {e:?}"))))?;
let server_handshake_hash = hs.get_handshake_hash().to_vec();
let (tx, rx) = oneshot::channel();
self.pending_handshakes.insert(dst_peer_id, tx);
let send_res = self
.send_handshake_packet(
out[..out_len].to_vec(),
PacketType::RelayHandshake,
dst_peer_id,
policy,
)
.await;
if send_res.is_err() {
self.pending_handshakes.remove(&dst_peer_id);
}
send_res?;
let msg2_pkt = match timeout(Duration::from_secs(HANDSHAKE_TIMEOUT_SECS), rx).await {
Ok(Ok(pkt)) => pkt,
Ok(Err(_)) => {
self.pending_handshakes.remove(&dst_peer_id);
return Err(Error::RouteError(Some(
"relay handshake canceled".to_string(),
)));
}
Err(_) => {
self.pending_handshakes.remove(&dst_peer_id);
return Err(Error::RouteError(Some(
"relay handshake timeout".to_string(),
)));
}
};
let msg2_pb = self.decode_handshake_message::<RelayNoiseMsg2Pb>(
PacketType::RelayHandshakeAck,
&mut hs,
msg2_pkt,
)?;
if msg2_pb.a_conn_id_echo != Some(a_conn_id.into()) {
return Err(Error::RouteError(Some(
"relay msg2 conn_id_echo mismatch".to_string(),
)));
}
if msg2_pb.b_network_name == network.network_name {
if msg2_pb.role_hint != 1 {
return Err(Error::RouteError(Some(
"role_hint must be 1 when network_name is same".to_string(),
)));
}
let Some(secret_proof_32) = msg2_pb.secret_proof_32 else {
return Err(Error::RouteError(Some(
"secret_proof_32 must be present when role_hint is 1".to_string(),
)));
};
let verify_result = self
.global_ctx
.get_secret_proof(&server_handshake_hash)
.map(|mac| mac.verify_slice(&secret_proof_32).is_ok());
if verify_result != Some(true) {
return Err(Error::RouteError(Some(
"secret_proof_32 verify failed".to_string(),
)));
}
}
let action = PeerConnSessionActionPb::try_from(msg2_pb.action)
.map_err(|_| Error::RouteError(Some("invalid session action".to_string())))?;
let session_action = match action {
PeerConnSessionActionPb::Join => PeerSessionAction::Join,
PeerConnSessionActionPb::Sync => PeerSessionAction::Sync,
PeerConnSessionActionPb::Create => PeerSessionAction::Create,
};
let remote_static_key = if remote_static.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&remote_static);
Some(key)
} else {
None
};
let root_key_bytes = msg2_pb
.root_key_32
.as_deref()
.filter(|v| v.len() == 32)
.map(|v| {
let mut key_bytes = [0u8; 32];
key_bytes.copy_from_slice(v);
key_bytes
});
let algo = self.global_ctx.get_flags().encryption_algorithm.clone();
let session = self
.peer_session_store
.apply_initiator_action(
&session_key,
session_action,
msg2_pb.b_session_generation,
root_key_bytes,
msg2_pb.initial_epoch,
algo,
msg2_pb.server_encryption_algorithm.clone(),
remote_static_key,
)
.map_err(|e| Error::RouteError(Some(format!("{e:?}"))))?;
Ok(session)
}
fn register_handshake_success(&self, dst_peer_id: PeerId) {
let mut entry = self.states.entry(dst_peer_id).or_default();
entry.failure_count = 0;
entry.next_retry_at = None;
}
fn register_handshake_failure(&self, dst_peer_id: PeerId, attempt: u32) {
let mut entry = self.states.entry(dst_peer_id).or_default();
entry.failure_count = entry.failure_count.saturating_add(1);
let backoff = HANDSHAKE_RETRY_BASE_MS.saturating_mul(1 << attempt);
entry.next_retry_at = Some(Instant::now() + Duration::from_millis(backoff));
}
fn decode_handshake_message<MsgT: Message + Default>(
&self,
expected_type: PacketType,
hs: &mut snow::HandshakeState,
pkt: ZCPacket,
) -> Result<MsgT, Error> {
let hdr = pkt.peer_manager_header().ok_or_else(|| {
Error::RouteError(Some("packet without peer manager header".to_string()))
})?;
if hdr.packet_type != expected_type as u8 {
return Err(Error::RouteError(Some("packet type mismatch".to_string())));
}
let mut out = vec![0u8; 4096];
let out_len = hs
.read_message(pkt.payload(), &mut out)
.map_err(|e| Error::RouteError(Some(format!("noise read msg failed: {e:?}"))))?;
let msg = MsgT::decode(&out[..out_len])
.map_err(|e| Error::RouteError(Some(format!("decode message failed: {e:?}"))))?;
Ok(msg)
}
pub async fn handle_handshake_packet(&self, packet: ZCPacket) -> Result<(), Error> {
let hdr = packet
.peer_manager_header()
.ok_or_else(|| Error::RouteError(Some("packet without header".to_string())))?;
let src_peer_id = hdr.from_peer_id.get();
match hdr.packet_type {
x if x == PacketType::RelayHandshake as u8 => {
tracing::debug!("handle_relay_msg1 from {:?}", src_peer_id);
self.handle_relay_msg1(packet, src_peer_id).await
}
x if x == PacketType::RelayHandshakeAck as u8 => {
if let Some((_, sender)) = self.pending_handshakes.remove(&src_peer_id) {
let _ = sender.send(packet);
}
Ok(())
}
_ => Ok(()),
}
}
async fn handle_relay_msg1(&self, msg1: ZCPacket, remote_peer_id: PeerId) -> Result<(), Error> {
// Check for bidirectional handshake race condition.
// If we are also waiting for a RelayHandshakeAck from this peer,
// use deterministic rule: the peer with smaller peer_id becomes initiator.
if self.pending_handshakes.contains_key(&remote_peer_id) {
// We have a pending handshake as initiator.
// If remote_peer_id < my_peer_id, remote should be initiator, we should be responder.
// Cancel our pending handshake and proceed as responder.
if remote_peer_id < self.my_peer_id {
tracing::debug!(
?remote_peer_id,
my_peer_id = ?self.my_peer_id,
"bidirectional handshake race: yielding initiator role to smaller peer_id"
);
// Remove our pending handshake
self.pending_handshakes.remove(&remote_peer_id);
} else {
// We have smaller peer_id, we should remain initiator.
// Ignore this RelayHandshake and let our initiator flow complete.
tracing::debug!(
?remote_peer_id,
my_peer_id = ?self.my_peer_id,
"bidirectional handshake race: keeping initiator role due to smaller peer_id"
);
return Err(Error::RouteError(Some(
"bidirectional handshake race: we are initiator".to_string(),
)));
}
}
let (local_private_key, _local_public_key) = self.get_local_keypair()?;
let params: NoiseParams = "Noise_IK_25519_ChaChaPoly_SHA256"
.parse()
.map_err(|e| Error::RouteError(Some(format!("parse noise params failed: {e:?}"))))?;
let builder = snow::Builder::new(params);
let mut hs = builder
.prologue(RELAY_NOISE_PROLOGUE)
.map_err(|e| Error::RouteError(Some(format!("set prologue failed: {e:?}"))))?
.local_private_key(&local_private_key)
.map_err(|e| Error::RouteError(Some(format!("set local key failed: {e:?}"))))?
.build_responder()
.map_err(|e| Error::RouteError(Some(format!("build responder failed: {e:?}"))))?;
let msg1_pb = self.decode_handshake_message::<RelayNoiseMsg1Pb>(
PacketType::RelayHandshake,
&mut hs,
msg1,
)?;
let remote_network_name = msg1_pb.a_network_name.clone();
let remote_static = hs
.get_remote_static()
.map(|x: &[u8]| x.to_vec())
.unwrap_or_default();
let remote_static_key = if remote_static.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&remote_static);
Some(key)
} else {
None
};
// Verify initiator's static public key matches the expected key from route info
let expected_pubkey = self.get_remote_static_pubkey(remote_peer_id).await?;
if remote_static != expected_pubkey {
return Err(Error::RouteError(Some(format!(
"responder: initiator static pubkey mismatch for peer {}, expected {} bytes, got {} bytes",
remote_peer_id,
expected_pubkey.len(),
remote_static.len()
))));
}
let server_network_name = self.global_ctx.get_network_name();
let (role_hint, secret_proof_32) = if remote_network_name == server_network_name {
let proof = self
.global_ctx
.get_secret_proof(hs.get_handshake_hash())
.map(|mac| mac.finalize().into_bytes().to_vec());
(1, proof)
} else {
(2, None)
};
let algo = self.global_ctx.get_flags().encryption_algorithm.clone();
let key = SessionKey::new(server_network_name.clone(), remote_peer_id);
let upsert = self
.peer_session_store
.upsert_responder_session(
&key,
msg1_pb.a_session_generation,
algo.clone(),
msg1_pb.client_encryption_algorithm.clone(),
remote_static_key,
)
.map_err(|e| Error::RouteError(Some(format!("{e:?}"))))?;
let msg2_pb = RelayNoiseMsg2Pb {
b_network_name: server_network_name,
role_hint,
action: match upsert.action {
PeerSessionAction::Join => PeerConnSessionActionPb::Join as i32,
PeerSessionAction::Sync => PeerConnSessionActionPb::Sync as i32,
PeerSessionAction::Create => PeerConnSessionActionPb::Create as i32,
},
b_session_generation: upsert.session_generation,
root_key_32: upsert.root_key.map(|k| k.to_vec()),
initial_epoch: upsert.initial_epoch,
b_conn_id: Some(uuid::Uuid::new_v4().into()),
a_conn_id_echo: msg1_pb.a_conn_id,
secret_proof_32,
server_encryption_algorithm: algo,
};
let payload = msg2_pb.encode_to_vec();
let mut out = vec![0u8; 4096];
let out_len = hs
.write_message(&payload, &mut out)
.map_err(|e| Error::RouteError(Some(format!("noise write msg2 failed: {e:?}"))))?;
self.register_handshake_success(remote_peer_id);
self.send_handshake_packet(
out[..out_len].to_vec(),
PacketType::RelayHandshakeAck,
remote_peer_id,
NextHopPolicy::LeastHop,
)
.await?;
// Flush any packets buffered while waiting for the handshake to complete
self.flush_pending_packets(remote_peer_id, upsert.session)
.await;
Ok(())
}
pub fn decrypt_if_needed(&self, packet: &mut ZCPacket) -> Result<bool, Error> {
if !self.is_secure_mode_enabled() {
return Ok(false);
}
let hdr = packet
.peer_manager_header()
.ok_or_else(|| Error::RouteError(Some("packet without header".to_string())))?;
let from_peer_id = hdr.from_peer_id.get();
let network = self.global_ctx.get_network_identity();
let key = SessionKey::new(network.network_name.clone(), from_peer_id);
let Some(session) = self.peer_session_store.get(&key) else {
return Ok(false);
};
let now = Instant::now();
let mut entry = self.states.entry(from_peer_id).or_default();
entry.last_active_at = now;
session.decrypt_payload(from_peer_id, self.my_peer_id, packet)?;
Ok(true)
}
pub fn evict_idle_sessions(&self, idle: Duration) {
let now = Instant::now();
let mut to_remove = Vec::new();
for entry in self.states.iter() {
if now.duration_since(entry.last_active_at) > idle {
to_remove.push(*entry.key());
}
}
for peer_id in to_remove {
self.states.remove(&peer_id);
self.pending_handshakes.remove(&peer_id);
self.handshake_locks.remove(&peer_id);
self.pending_packets.remove(&peer_id);
}
}
pub fn has_state(&self, peer_id: PeerId) -> bool {
self.states.contains_key(&peer_id)
}
pub fn failure_count(&self, peer_id: PeerId) -> Option<u32> {
self.states.get(&peer_id).map(|v| v.failure_count)
}
pub fn is_backoff_active(&self, peer_id: PeerId) -> bool {
self.states
.get(&peer_id)
.and_then(|v| v.next_retry_at)
.is_some_and(|ts| Instant::now() < ts)
}
/// Remove relay-specific state for a specific peer.
/// This does NOT remove the session from PeerSessionStore, because the
/// session lifecycle is independent of any particular connection type
/// (relay or direct). The session may still be used by direct connections
/// or for fast reconnection (Join instead of Create).
pub fn remove_peer(&self, peer_id: PeerId) {
self.states.remove(&peer_id);
self.pending_handshakes.remove(&peer_id);
self.handshake_locks.remove(&peer_id);
self.pending_packets.remove(&peer_id);
tracing::debug!(?peer_id, "RelayPeerMap removed peer relay state");
}
}
+581 -1
View File
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use crate::{
common::{
@@ -9,12 +10,21 @@ use crate::{
},
PeerId,
},
tunnel::ring::create_ring_tunnel_pair,
tunnel::{
common::tests::wait_for_condition,
packet_def::{PacketType, ZCPacket},
ring::create_ring_tunnel_pair,
},
};
use super::{
create_packet_recv_chan,
peer_conn::tests::set_secure_mode_cfg,
peer_manager::{PeerManager, RouteAlgoType},
peer_map::PeerMap,
peer_session::{PeerSession, PeerSessionStore, SessionKey},
relay_peer_map::RelayPeerMap,
route_trait::NextHopPolicy,
};
pub async fn create_mock_peer_manager() -> Arc<PeerManager> {
@@ -37,6 +47,19 @@ pub async fn create_mock_peer_manager_with_name(network_name: String) -> Arc<Pee
peer_mgr
}
pub async fn create_mock_peer_manager_secure(
network_name: String,
network_secret: String,
) -> Arc<PeerManager> {
let (s, _r) = create_packet_recv_chan();
let g =
get_mock_global_ctx_with_network(Some(NetworkIdentity::new(network_name, network_secret)));
set_secure_mode_cfg(&g, true);
let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, g, s));
peer_mgr.run().await.unwrap();
peer_mgr
}
pub async fn connect_peer_manager(client: Arc<PeerManager>, server: Arc<PeerManager>) {
let (a_ring, b_ring) = create_ring_tunnel_pair();
let a_mgr_copy = client;
@@ -127,3 +150,560 @@ async fn foreign_mgr_stress_test() {
}
}
}
#[tokio::test]
async fn relay_peer_map_secure_session_decrypt() {
let (s, _r) = create_packet_recv_chan();
let ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"net1".to_string(),
"sec1".to_string(),
)));
set_secure_mode_cfg(&ctx, true);
let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10));
let store = Arc::new(PeerSessionStore::new());
let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone());
let algo = ctx.get_flags().encryption_algorithm.clone();
let root_key = [7u8; 32];
let session = Arc::new(PeerSession::new(
20,
root_key,
1,
1,
algo.clone(),
algo.clone(),
None,
));
let key = SessionKey::new(ctx.get_network_identity().network_name, 20);
store.insert_session(key.clone(), session.clone());
relay_map
.ensure_session(20, NextHopPolicy::LeastHop)
.await
.unwrap();
assert!(relay_map.has_session(20));
let mut packet = ZCPacket::new_with_payload(b"relay-hello");
packet.fill_peer_manager_hdr(20, 10, PacketType::Data as u8);
session.encrypt_payload(20, 10, &mut packet).unwrap();
assert!(relay_map.decrypt_if_needed(&mut packet).unwrap());
assert_eq!(packet.payload(), b"relay-hello");
}
#[tokio::test]
async fn relay_peer_map_retry_backoff_and_evict() {
let (s, _r) = create_packet_recv_chan();
let ctx_secure = get_mock_global_ctx();
set_secure_mode_cfg(&ctx_secure, true);
let peer_map = Arc::new(PeerMap::new(s, ctx_secure.clone(), 10));
let relay_map = RelayPeerMap::new(
peer_map,
ctx_secure.clone(),
10,
Arc::new(PeerSessionStore::new()),
);
let ret = relay_map
.handshake_session(20, NextHopPolicy::LeastHop, None)
.await;
assert!(ret.is_err());
assert!(relay_map.failure_count(20).unwrap_or(0) >= 1);
assert!(relay_map.is_backoff_active(20));
let (s2, _r2) = create_packet_recv_chan();
let ctx_plain = get_mock_global_ctx();
let peer_map_plain = Arc::new(PeerMap::new(s2, ctx_plain.clone(), 30));
let relay_map_plain = RelayPeerMap::new(
peer_map_plain,
ctx_plain.clone(),
30,
Arc::new(PeerSessionStore::new()),
);
let mut pkt = ZCPacket::new_with_payload(b"evict");
pkt.fill_peer_manager_hdr(30, 40, PacketType::Data as u8);
let _ = relay_map_plain
.send_msg(pkt, 40, NextHopPolicy::LeastHop)
.await;
assert!(relay_map_plain.has_state(40));
relay_map_plain.evict_idle_sessions(Duration::from_millis(0));
assert!(!relay_map_plain.has_state(40));
}
#[tokio::test]
async fn relay_peer_map_pending_packet_buffer() {
// Verify that packets sent during handshake are buffered (not dropped),
// and flushed after handshake completes.
let (s, _r) = create_packet_recv_chan();
let ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"net1".to_string(),
"sec1".to_string(),
)));
set_secure_mode_cfg(&ctx, true);
let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10));
let store = Arc::new(PeerSessionStore::new());
let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone());
// Send multiple packets while no session exists (handshake will fail, but packets should be buffered)
for i in 0..5u8 {
let mut pkt = ZCPacket::new_with_payload(&[i]);
pkt.fill_peer_manager_hdr(10, 20, PacketType::Data as u8);
let _ = relay_map.send_msg(pkt, 20, NextHopPolicy::LeastHop).await;
}
// Verify packets were buffered
assert_eq!(
relay_map
.pending_packets
.get(&20)
.map(|v| v.len())
.unwrap_or(0),
5,
"5 packets should be buffered during handshake"
);
// Verify buffer respects capacity limit
for i in 0..50u8 {
let mut pkt = ZCPacket::new_with_payload(&[i]);
pkt.fill_peer_manager_hdr(10, 20, PacketType::Data as u8);
let _ = relay_map.send_msg(pkt, 20, NextHopPolicy::LeastHop).await;
}
let buffered = relay_map
.pending_packets
.get(&20)
.map(|v| v.len())
.unwrap_or(0);
assert!(
buffered <= 32,
"buffer should not exceed MAX_PENDING_PACKETS_PER_PEER, got {buffered}"
);
// Verify remove_peer clears pending packets
relay_map.remove_peer(20);
assert_eq!(
relay_map
.pending_packets
.get(&20)
.map(|v| v.len())
.unwrap_or(0),
0,
"pending packets should be cleared on peer removal"
);
}
#[tokio::test]
async fn relay_peer_map_pending_packets_flushed_on_handshake_success() {
// Test that pending packets are flushed after handshake succeeds.
// We pre-populate the buffer, then run handshake, and verify it's cleared.
let peer_a = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_b = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_c = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
connect_peer_manager(peer_a.clone(), peer_b.clone()).await;
connect_peer_manager(peer_b.clone(), peer_c.clone()).await;
let peer_a_id = peer_a.my_peer_id();
let peer_c_id = peer_c.my_peer_id();
// Wait for routes to propagate
wait_for_condition(
|| {
let peer_a = peer_a.clone();
let peer_c = peer_c.clone();
async move { wait_route_appear(peer_a.clone(), peer_c).await.is_ok() }
},
Duration::from_secs(10),
)
.await;
// Wait for noise_static_pubkey to be available on both sides
wait_for_condition(
|| {
let peer_a = peer_a.clone();
async move {
peer_a
.get_peer_map()
.get_route_peer_info(peer_c_id)
.await
.map(|info| !info.noise_static_pubkey.is_empty())
.unwrap_or(false)
}
},
Duration::from_secs(10),
)
.await;
let relay_a = peer_a.get_relay_peer_map();
// Pre-populate pending packets buffer (simulating what send_msg does during handshake)
for i in 0..3u8 {
let mut pkt = ZCPacket::new_with_payload(&[i]);
pkt.fill_peer_manager_hdr(peer_a_id, peer_c_id, PacketType::Data as u8);
relay_a
.pending_packets
.entry(peer_c_id)
.or_default()
.push((pkt, NextHopPolicy::LeastHop));
}
assert_eq!(
relay_a
.pending_packets
.get(&peer_c_id)
.map(|v| v.len())
.unwrap_or(0),
3,
"3 packets should be in the buffer"
);
// Run handshake — on success it should flush the buffer
relay_a
.handshake_session(peer_c_id, NextHopPolicy::LeastHop, None)
.await
.unwrap();
// Verify session established and buffer cleared
assert!(relay_a.has_session(peer_c_id));
assert_eq!(
relay_a
.pending_packets
.get(&peer_c_id)
.map(|v| v.len())
.unwrap_or(0),
0,
"pending packets should be flushed after successful handshake"
);
}
#[tokio::test]
async fn relay_peer_map_real_link_handshake_success() {
let peer_a = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_b = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_c = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
connect_peer_manager(peer_a.clone(), peer_b.clone()).await;
connect_peer_manager(peer_b.clone(), peer_c.clone()).await;
let peer_a_id = peer_a.my_peer_id();
let peer_b_id = peer_b.my_peer_id();
let peer_c_id = peer_c.my_peer_id();
wait_for_condition(
|| {
let peer_a = peer_a.clone();
let peer_c = peer_c.clone();
async move { wait_route_appear(peer_a.clone(), peer_c).await.is_ok() }
},
Duration::from_secs(10),
)
.await;
wait_for_condition(
|| {
let peer_a = peer_a.clone();
async move {
peer_a
.get_peer_map()
.get_gateway_peer_id(peer_c_id, NextHopPolicy::LeastHop)
.await
== Some(peer_b_id)
}
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| {
let peer_a = peer_a.clone();
async move {
peer_a
.get_peer_map()
.get_route_peer_info(peer_c_id)
.await
.map(|info| !info.noise_static_pubkey.is_empty())
.unwrap_or(false)
}
},
Duration::from_secs(10),
)
.await;
let relay_a = peer_a.get_relay_peer_map();
let relay_c = peer_c.get_relay_peer_map();
relay_a
.handshake_session(peer_c_id, NextHopPolicy::LeastHop, None)
.await
.unwrap();
wait_for_condition(
|| {
let relay_a = relay_a.clone();
async move { relay_a.has_session(peer_c_id) }
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| {
let relay_c = relay_c.clone();
async move { relay_c.has_session(peer_a_id) }
},
Duration::from_secs(5),
)
.await;
}
#[tokio::test]
async fn relay_peer_map_responder_rejects_mismatched_pubkey() {
// Create three peers: A -> B -> C
let peer_a = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_b = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_c = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
connect_peer_manager(peer_a.clone(), peer_b.clone()).await;
connect_peer_manager(peer_b.clone(), peer_c.clone()).await;
let peer_a_id = peer_a.my_peer_id();
let peer_c_id = peer_c.my_peer_id();
// Wait for routes to propagate
wait_for_condition(
|| {
let peer_a = peer_a.clone();
let peer_c = peer_c.clone();
async move { wait_route_appear(peer_a.clone(), peer_c).await.is_ok() }
},
Duration::from_secs(10),
)
.await;
// Wait for noise_static_pubkey to be available
wait_for_condition(
|| {
let peer_a = peer_a.clone();
async move {
peer_a
.get_peer_map()
.get_route_peer_info(peer_c_id)
.await
.map(|info| !info.noise_static_pubkey.is_empty())
.unwrap_or(false)
}
},
Duration::from_secs(10),
)
.await;
// Get the original correct pubkey to verify it exists
let original_info = peer_a
.get_peer_map()
.get_route_peer_info(peer_c_id)
.await
.expect("should have route info for peer_c");
assert!(
!original_info.noise_static_pubkey.is_empty(),
"noise_static_pubkey should be present"
);
// Attempt handshake - this should succeed because pubkeys match
let relay_a = peer_a.get_relay_peer_map();
let result = relay_a
.handshake_session(peer_c_id, NextHopPolicy::LeastHop, None)
.await;
// The handshake should succeed because the pubkeys match
assert!(
result.is_ok(),
"handshake should succeed with matching pubkeys"
);
// Verify session was established on both sides
wait_for_condition(
|| {
let relay_a = relay_a.clone();
async move { relay_a.has_session(peer_c_id) }
},
Duration::from_secs(5),
)
.await;
let relay_c = peer_c.get_relay_peer_map();
wait_for_condition(
|| {
let relay_c = relay_c.clone();
async move { relay_c.has_session(peer_a_id) }
},
Duration::from_secs(5),
)
.await;
}
#[tokio::test]
async fn relay_peer_map_remove_peer() {
let (s, _r) = create_packet_recv_chan();
let ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"net1".to_string(),
"sec1".to_string(),
)));
set_secure_mode_cfg(&ctx, true);
let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10));
let store = Arc::new(PeerSessionStore::new());
let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone());
let peer_1: PeerId = 100;
// Add session for peer_1
let root_key = [1u8; 32];
let session = Arc::new(PeerSession::new(
peer_1,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
));
let key = SessionKey::new(ctx.get_network_name(), peer_1);
store.insert_session(key.clone(), session);
assert!(store.get(&key).is_some());
// Remove the peer relay state
relay_map.remove_peer(peer_1);
// Session should still be in the store (lifecycle is independent of relay state)
assert!(
store.get(&key).is_some(),
"session should persist after relay peer removal"
);
}
/// Test bidirectional handshake race resolution.
/// When both peers simultaneously initiate handshake, the one with smaller peer_id
/// should become initiator, and the other should yield and become responder.
#[tokio::test]
async fn relay_peer_map_bidirectional_handshake_race() {
// Create three peers: A -> B -> C
let peer_a = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_b = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
let peer_c = create_mock_peer_manager_secure("net1".to_string(), "sec1".to_string()).await;
connect_peer_manager(peer_a.clone(), peer_b.clone()).await;
connect_peer_manager(peer_b.clone(), peer_c.clone()).await;
let peer_a_id = peer_a.my_peer_id();
let peer_c_id = peer_c.my_peer_id();
// Wait for routes to propagate
wait_for_condition(
|| {
let peer_a = peer_a.clone();
let peer_c = peer_c.clone();
async move { wait_route_appear(peer_a.clone(), peer_c).await.is_ok() }
},
Duration::from_secs(10),
)
.await;
// Wait for noise_static_pubkey to be available
wait_for_condition(
|| {
let peer_a = peer_a.clone();
async move {
peer_a
.get_peer_map()
.get_route_peer_info(peer_c_id)
.await
.map(|info| !info.noise_static_pubkey.is_empty())
.unwrap_or(false)
}
},
Duration::from_secs(10),
)
.await;
wait_for_condition(
|| {
let peer_c = peer_c.clone();
async move {
peer_c
.get_peer_map()
.get_route_peer_info(peer_a_id)
.await
.map(|info| !info.noise_static_pubkey.is_empty())
.unwrap_or(false)
}
},
Duration::from_secs(10),
)
.await;
// Simulate bidirectional handshake race by having both sides initiate simultaneously
let relay_a = peer_a.get_relay_peer_map();
let relay_c = peer_c.get_relay_peer_map();
// Both sides initiate handshake at the same time
let handle_a = tokio::spawn({
let relay_a = relay_a.clone();
async move {
relay_a
.handshake_session(peer_c_id, NextHopPolicy::LeastHop, None)
.await
}
});
let handle_c = tokio::spawn({
let relay_c = relay_c.clone();
async move {
relay_c
.handshake_session(peer_a_id, NextHopPolicy::LeastHop, None)
.await
}
});
// Wait for both handshakes to complete
let (result_a, result_c) = tokio::join!(handle_a, handle_c);
// At least one should succeed (the initiator with smaller peer_id)
// Both could succeed if race resolution worked correctly
tracing::info!(
?peer_a_id,
?peer_c_id,
?result_a,
?result_c,
"bidirectional handshake results"
);
// Wait for sessions to be established
wait_for_condition(
|| {
let relay_a = relay_a.clone();
async move { relay_a.has_session(peer_c_id) }
},
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| {
let relay_c = relay_c.clone();
async move { relay_c.has_session(peer_a_id) }
},
Duration::from_secs(5),
)
.await;
// Both sides should have sessions after race resolution
assert!(
relay_a.has_session(peer_c_id),
"peer_a should have session with peer_c"
);
assert!(
relay_c.has_session(peer_a_id),
"peer_c should have session with peer_a"
);
}