feat(credential): enforce signed credential distribution across mixed admin/shared topology (#1972)

This commit is contained in:
KKRainbow
2026-03-10 08:37:33 +08:00
committed by GitHub
parent ef44027f57
commit 694b8d349d
15 changed files with 894 additions and 186 deletions
+65 -30
View File
@@ -34,7 +34,7 @@ use crate::{
proto::{
api::instance::{ForeignNetworkEntryPb, ListForeignNetworkResponse, PeerInfo},
common::LimiterConfig,
peer_rpc::DirectConnectorRpcServer,
peer_rpc::{DirectConnectorRpcServer, PeerIdentityType},
},
tunnel::packet_def::{PacketType, ZCPacket},
use_global_var,
@@ -94,6 +94,7 @@ impl ForeignNetworkEntry {
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
relay_data: bool,
peer_session_store: Arc<PeerSessionStore>,
pm_packet_sender: PacketRecvChan,
) -> Self {
let stats_mgr = global_ctx.stats_manager().clone();
@@ -107,9 +108,9 @@ impl ForeignNetworkEntry {
foreign_global_ctx.clone(),
my_peer_id,
));
let peer_session_store = Arc::new(PeerSessionStore::new());
let relay_peer_map = RelayPeerMap::new(
peer_map.clone(),
None,
foreign_global_ctx.clone(),
my_peer_id,
peer_session_store.clone(),
@@ -181,6 +182,7 @@ impl ForeignNetworkEntry {
PUBLIC_SERVER_HOSTNAME_PREFIX,
global_ctx.get_hostname()
)));
config.set_secure_mode(global_ctx.config.get_secure_mode());
let mut flags = config.get_flags();
flags.disable_relay_kcp = !global_ctx.get_flags().enable_relay_foreign_network_kcp;
@@ -349,42 +351,64 @@ impl ForeignNetworkEntry {
.get_counter(MetricName::TrafficPacketsRx, label_set.clone());
self.tasks.lock().await.spawn(async move {
while let Ok(zc_packet) = recv_packet_from_chan(&mut recv).await {
while let Ok(mut zc_packet) = recv_packet_from_chan(&mut recv).await {
let buf_len = zc_packet.buf_len();
let Some(hdr) = zc_packet.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
};
tracing::trace!(?hdr, "recv packet in foreign network manager");
let from_peer_id = hdr.from_peer_id.get();
let packet_type = hdr.packet_type;
let len = hdr.len.get();
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id == my_node_id {
if hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
if packet_type == PacketType::RelayHandshake as u8
|| packet_type == PacketType::RelayHandshakeAck as u8
{
let _ = relay_peer_map.handle_handshake_packet(zc_packet).await;
continue;
}
if hdr.packet_type == PacketType::TaRpc as u8
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
if !peer_map.has_peer(from_peer_id) && relay_peer_map.is_secure_mode_enabled() {
match relay_peer_map.decrypt_if_needed(&mut zc_packet).await {
Ok(true) => {}
Ok(false) => {
tracing::error!("relay session not found");
continue;
}
Err(e) => {
tracing::error!(?e, "relay decrypt failed");
continue;
}
}
}
if packet_type == PacketType::TaRpc as u8
|| packet_type == PacketType::RpcReq as u8
|| packet_type == PacketType::RpcResp as u8
{
rx_bytes.add(buf_len as u64);
rx_packets.inc();
rpc_sender.send(zc_packet).unwrap();
continue;
}
tracing::trace!(?hdr, "ignore packet in foreign network");
tracing::trace!(
?packet_type,
?len,
?from_peer_id,
?to_peer_id,
"ignore packet in foreign network"
);
} else {
if hdr.packet_type == PacketType::Data as u8
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8
|| hdr.packet_type == PacketType::RelayHandshake as u8
|| hdr.packet_type == PacketType::RelayHandshakeAck as u8
if packet_type == PacketType::Data as u8
|| packet_type == PacketType::KcpSrc as u8
|| packet_type == PacketType::KcpDst as u8
{
if !relay_data {
continue;
}
if !bps_limiter.try_consume(hdr.len.into()) {
if !bps_limiter.try_consume(len.into()) {
continue;
}
}
@@ -398,7 +422,19 @@ impl ForeignNetworkEntry {
match gateway_peer_id {
Some(peer_id) if peer_map.has_peer(peer_id) => {
if let Err(e) = peer_map.send_msg_directly(zc_packet, peer_id).await {
if peer_id != to_peer_id && hdr.from_peer_id.get() == my_node_id {
if let Err(e) = relay_peer_map
.send_msg(zc_packet, to_peer_id, NextHopPolicy::LeastHop)
.await
{
tracing::error!(
?e,
"send packet to foreign peer inside relay peer map failed"
);
}
} else if let Err(e) =
peer_map.send_msg_directly(zc_packet, peer_id).await
{
tracing::error!(
?e,
"send packet to foreign peer inside peer map failed"
@@ -437,21 +473,10 @@ impl ForeignNetworkEntry {
});
}
async fn run_peer_session_gc_routine(&self) {
let peer_session_store = self.peer_session_store.clone();
self.tasks.lock().await.spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
peer_session_store.evict_unused_sessions();
}
});
}
async fn prepare(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
self.prepare_route(accessor).await;
self.start_packet_recv().await;
self.run_relay_session_gc_routine().await;
self.run_peer_session_gc_routine().await;
self.peer_rpc.run();
self.peer_center.init().await;
}
@@ -463,6 +488,8 @@ impl Drop for ForeignNetworkEntry {
.rpc_server()
.registry()
.unregister_by_domain(&self.network.network_name);
self.global_ctx
.remove_trusted_keys(&self.network.network_name);
tracing::debug!(self.my_peer_id, ?self.network, "drop foreign network entry");
}
@@ -528,6 +555,7 @@ impl ForeignNetworkManagerData {
self.network_peer_last_update.remove(network_name);
}
#[allow(clippy::too_many_arguments)]
async fn get_or_insert_entry(
&self,
network_identity: &NetworkIdentity,
@@ -535,6 +563,7 @@ impl ForeignNetworkManagerData {
dst_peer_id: PeerId,
relay_data: bool,
global_ctx: &ArcGlobalCtx,
peer_session_store: Arc<PeerSessionStore>,
pm_packet_sender: &PacketRecvChan,
) -> (Arc<ForeignNetworkEntry>, bool) {
let mut new_added = false;
@@ -550,6 +579,7 @@ impl ForeignNetworkManagerData {
my_peer_id,
global_ctx.clone(),
relay_data,
peer_session_store,
pm_packet_sender.clone(),
))
})
@@ -578,6 +608,7 @@ pub const FOREIGN_NETWORK_SERVICE_ID: u32 = 1;
pub struct ForeignNetworkManager {
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
peer_session_store: Arc<PeerSessionStore>,
packet_sender_to_mgr: PacketRecvChan,
data: Arc<ForeignNetworkManagerData>,
@@ -589,6 +620,7 @@ impl ForeignNetworkManager {
pub fn new(
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
peer_session_store: Arc<PeerSessionStore>,
packet_sender_to_mgr: PacketRecvChan,
accessor: Box<dyn GlobalForeignNetworkAccessor>,
) -> Self {
@@ -606,6 +638,7 @@ impl ForeignNetworkManager {
Self {
my_peer_id,
global_ctx,
peer_session_store,
packet_sender_to_mgr,
data,
@@ -641,13 +674,15 @@ impl ForeignNetworkManager {
peer_conn.get_peer_id(),
ret.is_ok(),
&self.global_ctx,
self.peer_session_store.clone(),
&self.packet_sender_to_mgr,
)
.await;
let _g = entry.lock.lock().await;
if entry.network != peer_conn.get_network_identity()
if (entry.network != peer_conn.get_network_identity()
&& peer_conn.get_peer_identity_type() != PeerIdentityType::SharedNode)
|| entry.my_peer_id != peer_conn.get_my_peer_id()
{
if new_added {
@@ -770,7 +805,7 @@ impl ForeignNetworkManager {
.map(|v| *v)
}
pub async fn send_msg_to_peer(
pub async fn forward_foreign_network_packet(
&self,
network_name: &str,
dst_peer_id: PeerId,
@@ -778,7 +813,7 @@ impl ForeignNetworkManager {
) -> Result<(), Error> {
if let Some(entry) = self.data.get_network_entry(network_name) {
entry
.relay_peer_map
.peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
} else {