From 694b8d349ddf0b520ba52b43222779f843bd6320 Mon Sep 17 00:00:00 2001 From: KKRainbow <443152178@qq.com> Date: Tue, 10 Mar 2026 08:37:33 +0800 Subject: [PATCH] feat(credential): enforce signed credential distribution across mixed admin/shared topology (#1972) --- easytier/src/common/global_ctx.rs | 77 +++- easytier/src/connector/udp_hole_punch/cone.rs | 3 +- easytier/src/connector/udp_hole_punch/mod.rs | 4 +- easytier/src/peers/credential_manager.rs | 69 ++-- easytier/src/peers/foreign_network_manager.rs | 95 +++-- easytier/src/peers/peer_conn.rs | 17 +- easytier/src/peers/peer_manager.rs | 29 +- easytier/src/peers/peer_ospf_route.rs | 367 ++++++++++++++++-- easytier/src/peers/peer_session.rs | 98 ++++- easytier/src/peers/relay_peer_map.rs | 81 ++-- easytier/src/peers/rpc_service.rs | 5 + easytier/src/peers/tests.rs | 10 +- easytier/src/proto/peer_rpc.proto | 11 +- easytier/src/proto/peer_rpc.rs | 68 ++++ easytier/src/tests/credential_tests.rs | 146 ++++++- 15 files changed, 894 insertions(+), 186 deletions(-) diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index dd60505d..ef6eafea 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -8,8 +8,10 @@ use std::{ }; use arc_swap::ArcSwap; +use dashmap::DashMap; use crate::common::config::ProxyNetworkConfig; +use crate::common::shrink_dashmap; use crate::common::stats_manager::StatsManager; use crate::common::token_bucket::TokenBucketManager; use crate::peers::acl_filter::AclFilter; @@ -101,6 +103,53 @@ impl TrustedKeyMetadata { } } +// key is (pubkey, network-name) +pub type TrustedKeyMap = HashMap, TrustedKeyMetadata>; + +struct TrustedKeyMapManager { + network_trusted_keys: DashMap>, +} + +impl TrustedKeyMapManager { + pub fn new() -> Self { + Self { + network_trusted_keys: DashMap::new(), + } + } + + pub fn update_trusted_keys(&self, network_name: &str, trusted_keys: TrustedKeyMap) { + match self.network_trusted_keys.entry(network_name.to_string()) { + dashmap::Entry::Vacant(entry) => { + entry.insert(ArcSwap::new(Arc::new(trusted_keys))); + } + dashmap::Entry::Occupied(entry) => { + entry.get().store(Arc::new(trusted_keys)); + } + } + } + + pub fn remove_trusted_keys(&self, network_name: &str) { + self.network_trusted_keys.remove(network_name); + shrink_dashmap(&self.network_trusted_keys, None); + } + + pub fn verify_trusted_key(&self, pubkey: &[u8], network_name: &str) -> bool { + let Some(trusted_keys) = self + .network_trusted_keys + .get(network_name) + .map(|v| v.load_full()) + else { + return false; + }; + + let Some(metadata) = trusted_keys.get(&pubkey.to_vec()) else { + return false; + }; + + !metadata.is_expired() + } +} + pub struct GlobalCtx { pub inst_name: String, pub id: uuid::Uuid, @@ -139,7 +188,7 @@ pub struct GlobalCtx { /// OSPF propagated trusted keys (peer pubkeys and admin credentials) /// Stored in ArcSwap for lock-free reads and atomic batch updates - trusted_keys: ArcSwap, TrustedKeyMetadata>>, + trusted_keys: Arc, } impl std::fmt::Debug for GlobalCtx { @@ -236,7 +285,7 @@ impl GlobalCtx { credential_manager, - trusted_keys: ArcSwap::new(Arc::new(HashMap::new())), + trusted_keys: Arc::new(TrustedKeyMapManager::new()), } } @@ -461,22 +510,28 @@ impl GlobalCtx { /// Check if a public key is trusted using two-level lookup: /// 1. OSPF propagated trusted_keys (lock-free) /// 2. Local credential_manager - pub fn is_pubkey_trusted(&self, pubkey: &[u8]) -> bool { + pub fn is_pubkey_trusted(&self, pubkey: &[u8], network_name: &str) -> bool { // First level: check OSPF propagated keys (lock-free) - let keys = self.trusted_keys.load(); - if let Some(metadata) = keys.get(pubkey) { - return !metadata.is_expired(); + if self.trusted_keys.verify_trusted_key(pubkey, network_name) { + return true; } - drop(keys); - // Second level: check local credential_manager - self.credential_manager.is_pubkey_trusted(pubkey) + // Second level: check local credential_manager if in the same network + if network_name == self.get_network_name() { + return self.credential_manager.is_pubkey_trusted(pubkey); + } + + false } /// Atomically replace all OSPF trusted keys with a new set /// Called by OSPF route layer after each route update - pub fn update_trusted_keys(&self, keys: HashMap, TrustedKeyMetadata>) { - self.trusted_keys.store(Arc::new(keys)); + pub fn update_trusted_keys(&self, keys: TrustedKeyMap, network_name: &str) { + self.trusted_keys.update_trusted_keys(network_name, keys); + } + + pub fn remove_trusted_keys(&self, network_name: &str) { + self.trusted_keys.remove_trusted_keys(network_name); } pub fn get_acl_groups(&self, peer_id: PeerId) -> Vec { diff --git a/easytier/src/connector/udp_hole_punch/cone.rs b/easytier/src/connector/udp_hole_punch/cone.rs index d28948a3..69b2d761 100644 --- a/easytier/src/connector/udp_hole_punch/cone.rs +++ b/easytier/src/connector/udp_hole_punch/cone.rs @@ -98,7 +98,6 @@ impl PunchConeHoleClient { } } - #[tracing::instrument(skip(self))] pub(crate) async fn do_hole_punching( &self, dst_peer_id: PeerId, @@ -241,7 +240,7 @@ impl PunchConeHoleClient { } } - return Ok(None); + Ok(None) } } diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index b8c6249a..8c61a72d 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -245,7 +245,7 @@ impl UdpHoePunchConnectorData { tracing::info!(?tunnel, "hole punching get tunnel success"); if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel, false).await { - tracing::warn!(?e, "add client tunnel failed"); + tracing::warn!("add client tunnel failed, err: {}", e); op(true); false } else { @@ -258,7 +258,7 @@ impl UdpHoePunchConnectorData { false } Err(e) => { - tracing::info!(?e, "hole punching failed"); + tracing::info!("hole punching failed, err: {}", e); op(true); false } diff --git a/easytier/src/peers/credential_manager.rs b/easytier/src/peers/credential_manager.rs index 6ed5ecad..b2a23954 100644 --- a/easytier/src/peers/credential_manager.rs +++ b/easytier/src/peers/credential_manager.rs @@ -10,7 +10,7 @@ use base64::Engine; use serde::{Deserialize, Serialize}; use x25519_dalek::{PublicKey, StaticSecret}; -use crate::proto::peer_rpc::TrustedCredentialPubkey; +use crate::proto::peer_rpc::{TrustedCredentialPubkey, TrustedCredentialPubkeyProof}; #[derive(Debug, Clone, Serialize, Deserialize)] struct CredentialEntry { @@ -83,7 +83,7 @@ impl CredentialManager { removed } - pub fn get_trusted_pubkeys(&self) -> Vec { + pub fn get_trusted_pubkeys(&self, network_secret: &str) -> Vec { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -94,14 +94,22 @@ impl CredentialManager { .unwrap() .values() .filter(|e| e.expiry_unix > now) - .map(|e| TrustedCredentialPubkey { - pubkey: Self::decode_pubkey_b64(&e.pubkey).unwrap_or_default(), - groups: e.groups.clone(), - allow_relay: e.allow_relay, - expiry_unix: e.expiry_unix, - allowed_proxy_cidrs: e.allowed_proxy_cidrs.clone(), + .map(|e| { + let credential = TrustedCredentialPubkey { + pubkey: Self::decode_pubkey_b64(&e.pubkey).unwrap_or_default(), + groups: e.groups.clone(), + allow_relay: e.allow_relay, + expiry_unix: e.expiry_unix, + allowed_proxy_cidrs: e.allowed_proxy_cidrs.clone(), + }; + TrustedCredentialPubkeyProof::new_signed(credential, network_secret) + }) + .filter(|e| { + e.credential + .as_ref() + .map(|x| !x.pubkey.is_empty()) + .unwrap_or(false) }) - .filter(|e| !e.pubkey.is_empty()) .collect() } @@ -202,13 +210,16 @@ mod tests { let pubkey_bytes = PublicKey::from(&private).as_bytes().to_vec(); assert!(mgr.is_pubkey_trusted(&pubkey_bytes)); - let trusted = mgr.get_trusted_pubkeys(); + let trusted = mgr.get_trusted_pubkeys("sec"); assert_eq!(trusted.len(), 1); - assert_eq!(trusted[0].groups, vec!["guest".to_string()]); + assert_eq!( + trusted[0].credential.as_ref().unwrap().groups, + vec!["guest".to_string()] + ); assert!(mgr.revoke_credential(&id)); assert!(!mgr.is_pubkey_trusted(&pubkey_bytes)); - assert!(mgr.get_trusted_pubkeys().is_empty()); + assert!(mgr.get_trusted_pubkeys("sec").is_empty()); } #[test] @@ -221,7 +232,7 @@ mod tests { let private = StaticSecret::from(privkey_bytes); let pubkey_bytes = PublicKey::from(&private).as_bytes().to_vec(); assert!(!mgr.is_pubkey_trusted(&pubkey_bytes)); - assert!(mgr.get_trusted_pubkeys().is_empty()); + assert!(mgr.get_trusted_pubkeys("sec").is_empty()); } #[test] @@ -300,12 +311,15 @@ mod tests { assert!(!mgr.is_pubkey_trusted(&pk1)); assert!(mgr.is_pubkey_trusted(&pk2)); - let trusted = mgr.get_trusted_pubkeys(); + let trusted = mgr.get_trusted_pubkeys("sec"); assert_eq!(trusted.len(), 1); - assert_eq!(trusted[0].groups, vec!["group2".to_string()]); - assert!(trusted[0].allow_relay); assert_eq!( - trusted[0].allowed_proxy_cidrs, + trusted[0].credential.as_ref().unwrap().groups, + vec!["group2".to_string()] + ); + assert!(trusted[0].credential.as_ref().unwrap().allow_relay); + assert_eq!( + trusted[0].credential.as_ref().unwrap().allowed_proxy_cidrs, vec!["10.0.0.0/8".to_string()] ); } @@ -320,20 +334,29 @@ mod tests { Duration::from_secs(7200), ); - let trusted = mgr.get_trusted_pubkeys(); + let trusted = mgr.get_trusted_pubkeys("sec"); assert_eq!(trusted.len(), 1); let tc = &trusted[0]; - assert_eq!(tc.groups, vec!["admin".to_string(), "ops".to_string()]); - assert!(tc.allow_relay); assert_eq!( - tc.allowed_proxy_cidrs, + tc.credential.as_ref().unwrap().groups, + vec!["admin".to_string(), "ops".to_string()] + ); + assert!(tc.credential.as_ref().unwrap().allow_relay); + assert_eq!( + tc.credential.as_ref().unwrap().allowed_proxy_cidrs, vec!["192.168.0.0/16".to_string(), "10.0.0.0/8".to_string()] ); - assert!(tc.expiry_unix > 0); + assert!(tc.credential.as_ref().unwrap().expiry_unix > 0); + assert!(tc.verify_credential_hmac("sec")); + assert!(tc + .credential + .as_ref() + .map(|x| !x.pubkey.is_empty()) + .unwrap_or(false)); let sk: [u8; 32] = BASE64_STANDARD.decode(&secret).unwrap().try_into().unwrap(); let pk = PublicKey::from(&StaticSecret::from(sk)).as_bytes().to_vec(); - assert_eq!(tc.pubkey, pk); + assert_eq!(tc.credential.as_ref().unwrap().pubkey, pk); } #[test] diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 1004b267..903b713d 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -34,7 +34,7 @@ use crate::{ proto::{ api::instance::{ForeignNetworkEntryPb, ListForeignNetworkResponse, PeerInfo}, common::LimiterConfig, - peer_rpc::DirectConnectorRpcServer, + peer_rpc::{DirectConnectorRpcServer, PeerIdentityType}, }, tunnel::packet_def::{PacketType, ZCPacket}, use_global_var, @@ -94,6 +94,7 @@ impl ForeignNetworkEntry { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, relay_data: bool, + peer_session_store: Arc, pm_packet_sender: PacketRecvChan, ) -> Self { let stats_mgr = global_ctx.stats_manager().clone(); @@ -107,9 +108,9 @@ impl ForeignNetworkEntry { foreign_global_ctx.clone(), my_peer_id, )); - let peer_session_store = Arc::new(PeerSessionStore::new()); let relay_peer_map = RelayPeerMap::new( peer_map.clone(), + None, foreign_global_ctx.clone(), my_peer_id, peer_session_store.clone(), @@ -181,6 +182,7 @@ impl ForeignNetworkEntry { PUBLIC_SERVER_HOSTNAME_PREFIX, global_ctx.get_hostname() ))); + config.set_secure_mode(global_ctx.config.get_secure_mode()); let mut flags = config.get_flags(); flags.disable_relay_kcp = !global_ctx.get_flags().enable_relay_foreign_network_kcp; @@ -349,42 +351,64 @@ impl ForeignNetworkEntry { .get_counter(MetricName::TrafficPacketsRx, label_set.clone()); self.tasks.lock().await.spawn(async move { - while let Ok(zc_packet) = recv_packet_from_chan(&mut recv).await { + while let Ok(mut zc_packet) = recv_packet_from_chan(&mut recv).await { let buf_len = zc_packet.buf_len(); let Some(hdr) = zc_packet.peer_manager_header() else { tracing::warn!("invalid packet, skip"); continue; }; tracing::trace!(?hdr, "recv packet in foreign network manager"); + let from_peer_id = hdr.from_peer_id.get(); + let packet_type = hdr.packet_type; + let len = hdr.len.get(); let to_peer_id = hdr.to_peer_id.get(); if to_peer_id == my_node_id { - if hdr.packet_type == PacketType::RelayHandshake as u8 - || hdr.packet_type == PacketType::RelayHandshakeAck as u8 + if packet_type == PacketType::RelayHandshake as u8 + || packet_type == PacketType::RelayHandshakeAck as u8 { let _ = relay_peer_map.handle_handshake_packet(zc_packet).await; continue; } - if hdr.packet_type == PacketType::TaRpc as u8 - || hdr.packet_type == PacketType::RpcReq as u8 - || hdr.packet_type == PacketType::RpcResp as u8 + + if !peer_map.has_peer(from_peer_id) && relay_peer_map.is_secure_mode_enabled() { + match relay_peer_map.decrypt_if_needed(&mut zc_packet).await { + Ok(true) => {} + Ok(false) => { + tracing::error!("relay session not found"); + continue; + } + Err(e) => { + tracing::error!(?e, "relay decrypt failed"); + continue; + } + } + } + + if packet_type == PacketType::TaRpc as u8 + || packet_type == PacketType::RpcReq as u8 + || packet_type == PacketType::RpcResp as u8 { rx_bytes.add(buf_len as u64); rx_packets.inc(); rpc_sender.send(zc_packet).unwrap(); continue; } - tracing::trace!(?hdr, "ignore packet in foreign network"); + tracing::trace!( + ?packet_type, + ?len, + ?from_peer_id, + ?to_peer_id, + "ignore packet in foreign network" + ); } else { - if hdr.packet_type == PacketType::Data as u8 - || hdr.packet_type == PacketType::KcpSrc as u8 - || hdr.packet_type == PacketType::KcpDst as u8 - || hdr.packet_type == PacketType::RelayHandshake as u8 - || hdr.packet_type == PacketType::RelayHandshakeAck as u8 + if packet_type == PacketType::Data as u8 + || packet_type == PacketType::KcpSrc as u8 + || packet_type == PacketType::KcpDst as u8 { if !relay_data { continue; } - if !bps_limiter.try_consume(hdr.len.into()) { + if !bps_limiter.try_consume(len.into()) { continue; } } @@ -398,7 +422,19 @@ impl ForeignNetworkEntry { match gateway_peer_id { Some(peer_id) if peer_map.has_peer(peer_id) => { - if let Err(e) = peer_map.send_msg_directly(zc_packet, peer_id).await { + if peer_id != to_peer_id && hdr.from_peer_id.get() == my_node_id { + if let Err(e) = relay_peer_map + .send_msg(zc_packet, to_peer_id, NextHopPolicy::LeastHop) + .await + { + tracing::error!( + ?e, + "send packet to foreign peer inside relay peer map failed" + ); + } + } else if let Err(e) = + peer_map.send_msg_directly(zc_packet, peer_id).await + { tracing::error!( ?e, "send packet to foreign peer inside peer map failed" @@ -437,21 +473,10 @@ impl ForeignNetworkEntry { }); } - async fn run_peer_session_gc_routine(&self) { - let peer_session_store = self.peer_session_store.clone(); - self.tasks.lock().await.spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - peer_session_store.evict_unused_sessions(); - } - }); - } - async fn prepare(&self, accessor: Box) { self.prepare_route(accessor).await; self.start_packet_recv().await; self.run_relay_session_gc_routine().await; - self.run_peer_session_gc_routine().await; self.peer_rpc.run(); self.peer_center.init().await; } @@ -463,6 +488,8 @@ impl Drop for ForeignNetworkEntry { .rpc_server() .registry() .unregister_by_domain(&self.network.network_name); + self.global_ctx + .remove_trusted_keys(&self.network.network_name); tracing::debug!(self.my_peer_id, ?self.network, "drop foreign network entry"); } @@ -528,6 +555,7 @@ impl ForeignNetworkManagerData { self.network_peer_last_update.remove(network_name); } + #[allow(clippy::too_many_arguments)] async fn get_or_insert_entry( &self, network_identity: &NetworkIdentity, @@ -535,6 +563,7 @@ impl ForeignNetworkManagerData { dst_peer_id: PeerId, relay_data: bool, global_ctx: &ArcGlobalCtx, + peer_session_store: Arc, pm_packet_sender: &PacketRecvChan, ) -> (Arc, bool) { let mut new_added = false; @@ -550,6 +579,7 @@ impl ForeignNetworkManagerData { my_peer_id, global_ctx.clone(), relay_data, + peer_session_store, pm_packet_sender.clone(), )) }) @@ -578,6 +608,7 @@ pub const FOREIGN_NETWORK_SERVICE_ID: u32 = 1; pub struct ForeignNetworkManager { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, + peer_session_store: Arc, packet_sender_to_mgr: PacketRecvChan, data: Arc, @@ -589,6 +620,7 @@ impl ForeignNetworkManager { pub fn new( my_peer_id: PeerId, global_ctx: ArcGlobalCtx, + peer_session_store: Arc, packet_sender_to_mgr: PacketRecvChan, accessor: Box, ) -> Self { @@ -606,6 +638,7 @@ impl ForeignNetworkManager { Self { my_peer_id, global_ctx, + peer_session_store, packet_sender_to_mgr, data, @@ -641,13 +674,15 @@ impl ForeignNetworkManager { peer_conn.get_peer_id(), ret.is_ok(), &self.global_ctx, + self.peer_session_store.clone(), &self.packet_sender_to_mgr, ) .await; let _g = entry.lock.lock().await; - if entry.network != peer_conn.get_network_identity() + if (entry.network != peer_conn.get_network_identity() + && peer_conn.get_peer_identity_type() != PeerIdentityType::SharedNode) || entry.my_peer_id != peer_conn.get_my_peer_id() { if new_added { @@ -770,7 +805,7 @@ impl ForeignNetworkManager { .map(|v| *v) } - pub async fn send_msg_to_peer( + pub async fn forward_foreign_network_packet( &self, network_name: &str, dst_peer_id: PeerId, @@ -778,7 +813,7 @@ impl ForeignNetworkManager { ) -> Result<(), Error> { if let Some(entry) = self.data.get_network_entry(network_name) { entry - .relay_peer_map + .peer_map .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) .await } else { diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index fc653e45..5c1491b1 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -690,7 +690,7 @@ impl PeerConn { /// | Admin | SharedNode | pinned key match | PeerVerified | EncryptedUnauthenticated | SharedNode | SharedNode | /// | Admin | SharedNode | local has no pinned key requirement | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode | /// | Credential | SharedNode | no pin and not trusted | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode | - /// | Credential | Credential | both keys trusted by admin distribution | PeerVerified | PeerVerified | Credential | Credential | + /// | Credential | Credential | should reject | handshake reject | handshake reject | unknown | unknown | /// /// Logic (in priority order): /// 1. **NetworkSecretConfirmed**: proof verification succeeds @@ -699,6 +699,7 @@ impl PeerConn { /// 3. **PeerVerified**: pubkey is in trusted list /// 4. **EncryptedUnauthenticated**: initiator without network_secret /// 5. **Reject**: none of the above + #[allow(clippy::too_many_arguments)] fn verify_remote_auth( &self, proof: Option<&[u8]>, @@ -707,6 +708,7 @@ impl PeerConn { pinned_pubkey: Option<&[u8]>, has_network_secret: bool, is_initiator: bool, + remote_network_name: &str, ) -> Result { // 1. Verify proof if let Some(proof) = proof { @@ -725,7 +727,11 @@ impl PeerConn { )); } // If no network_secret, pinned key must be in trusted list - if !has_network_secret && !self.global_ctx.is_pubkey_trusted(remote_pubkey) { + if !has_network_secret + && !self + .global_ctx + .is_pubkey_trusted(remote_pubkey, remote_network_name) + { return Err(Error::WaitRespError( "pinned pubkey not in trusted list".to_owned(), )); @@ -734,7 +740,10 @@ impl PeerConn { } // 3. Check if pubkey is in trusted list - if self.global_ctx.is_pubkey_trusted(remote_pubkey) { + if self + .global_ctx + .is_pubkey_trusted(remote_pubkey, remote_network_name) + { return Ok(SecureAuthLevel::PeerVerified); } @@ -903,6 +912,7 @@ impl PeerConn { pinned_remote_pubkey.as_deref(), network.network_secret.is_some(), true, // is_initiator + &remote_network_name, )? }; let peer_identity_type = self.classify_remote_identity( @@ -1154,6 +1164,7 @@ impl PeerConn { .network_secret .is_some(), false, // is_initiator + &remote_network_name, )? } else { SecureAuthLevel::EncryptedUnauthenticated diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index c1c48a9f..687f85d4 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -195,12 +195,6 @@ impl PeerManager { my_peer_id, )); let peer_session_store = Arc::new(PeerSessionStore::new()); - let relay_peer_map = RelayPeerMap::new( - peers.clone(), - global_ctx.clone(), - my_peer_id, - peer_session_store.clone(), - ); let encryptor = if global_ctx.get_flags().enable_encryption { // 只有在启用加密时才使用工厂函数选择算法 @@ -259,6 +253,7 @@ impl PeerManager { let foreign_network_manager = Arc::new(ForeignNetworkManager::new( my_peer_id, global_ctx.clone(), + peer_session_store.clone(), packet_send.clone(), Self::build_foreign_network_manager_accessor(&peers), )); @@ -269,6 +264,14 @@ impl PeerManager { my_peer_id, )); + let relay_peer_map = RelayPeerMap::new( + peers.clone(), + Some(foreign_network_client.clone()), + global_ctx.clone(), + my_peer_id, + peer_session_store.clone(), + ); + let data_compress_algo = global_ctx .get_flags() .data_compress_algo() @@ -619,7 +622,7 @@ impl PeerManager { MetricName::TrafficPacketsForeignForwardRx, ); if let Err(e) = foreign_network_mgr - .send_msg_to_peer( + .forward_foreign_network_packet( &foreign_network_name, foreign_peer_id, packet.foreign_network_packet(), @@ -816,8 +819,10 @@ impl PeerManager { tracing::error!(?e, "decrypt failed"); continue; } - } else if !peers.has_peer(from_peer_id) { - match relay_peer_map.decrypt_if_needed(&mut ret) { + } else if !peers.has_peer(from_peer_id) + && !foreign_client.has_next_hop(from_peer_id) + { + match relay_peer_map.decrypt_if_needed(&mut ret).await { Ok(true) => {} Ok(false) => { tracing::error!("relay session not found"); @@ -1180,13 +1185,13 @@ impl PeerManager { if peers.has_peer(dst_peer_id) { return peers.send_msg_directly(msg, dst_peer_id).await; + } else if foreign_network_client.has_next_hop(dst_peer_id) { + return foreign_network_client.send_msg(msg, dst_peer_id).await; } if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy.clone()).await { - if peers.has_peer(gateway) { + if peers.has_peer(gateway) || foreign_network_client.has_next_hop(gateway) { relay_peer_map.send_msg(msg, dst_peer_id, policy).await - } else if foreign_network_client.has_next_hop(gateway) { - foreign_network_client.send_msg(msg, gateway).await } else { tracing::warn!( ?gateway, diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index dc8a0b22..386c00ab 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -101,6 +101,18 @@ fn cidr_is_subset_str(child: &str, parent: &str) -> bool { } } +/// Patch specific fields in a raw DynamicMessage from a decoded RoutePeerInfo, +/// preserving all other fields (including unknown ones). +fn patch_raw_from_info(raw: &mut DynamicMessage, info: &RoutePeerInfo, fields: &[&str]) { + let mut decoded_raw = DynamicMessage::new(RoutePeerInfo::default().descriptor()); + decoded_raw.transcode_from(info).unwrap(); + for field_name in fields { + if let Some(value) = decoded_raw.get_field_by_name(field_name) { + raw.set_field_by_name(field_name, value.into_owned()); + } + } +} + #[derive(Debug, Clone)] struct AtomicVersion(Arc); @@ -229,12 +241,12 @@ impl RoutePeerInfo { noise_static_pubkey, // Only admin nodes (holding network_secret) publish trusted credential pubkeys - trusted_credential_pubkeys: if global_ctx - .get_network_identity() - .network_secret - .is_some() + trusted_credential_pubkeys: if let Some(network_secret) = + global_ctx.get_network_identity().network_secret { - global_ctx.get_credential_manager().get_trusted_pubkeys() + global_ctx + .get_credential_manager() + .get_trusted_pubkeys(&network_secret) } else { Vec::new() }, @@ -886,6 +898,7 @@ impl SyncedRouteInfo { /// Also returns a HashMap of trusted keys for synchronization to GlobalCtx. fn verify_and_update_credential_trusts( &self, + network_secret: Option<&str>, ) -> ( Vec, HashMap, crate::common::global_ctx::TrustedKeyMetadata>, @@ -919,7 +932,19 @@ impl SyncedRouteInfo { }, ); } - for tc in &info.trusted_credential_pubkeys { + for proof in &info.trusted_credential_pubkeys { + // If we have a network_secret, verify the HMAC as before. + // If we don't (e.g. credential nodes), accept proofs from admin peers + // based on the authenticated channel instead of local HMAC verification. + let hmac_valid = network_secret + .map(|secret| proof.verify_credential_hmac(secret)) + .unwrap_or(true); + if !hmac_valid { + continue; + } + let Some(tc) = proof.credential.as_ref() else { + continue; + }; if tc.expiry_unix > now { all_trusted .entry(tc.pubkey.clone()) @@ -2258,9 +2283,13 @@ impl PeerRouteServiceImpl { let my_foreign_network_updated = self.update_my_foreign_network().await; let mut untrusted_changed = false; if my_peer_info_updated { - let (untrusted, global_trusted_keys) = - self.synced_route_info.verify_and_update_credential_trusts(); - self.global_ctx.update_trusted_keys(global_trusted_keys); + let network_identity = self.global_ctx.get_network_identity(); + let network_secret = network_identity.network_secret.as_deref(); + let (untrusted, global_trusted_keys) = self + .synced_route_info + .verify_and_update_credential_trusts(network_secret); + self.global_ctx + .update_trusted_keys(global_trusted_keys, &network_identity.network_name); self.disconnect_untrusted_peers(&untrusted).await; untrusted_changed = !untrusted.is_empty(); } @@ -2913,6 +2942,7 @@ impl RouteSessionManager { .await .unwrap_or(PeerIdentityType::Admin); let from_is_credential = matches!(from_identity_type, PeerIdentityType::Credential); + let from_is_shared = matches!(from_identity_type, PeerIdentityType::SharedNode); let _session_lock = session.lock.lock(); @@ -2958,8 +2988,18 @@ impl RouteSessionManager { info }) .collect(); - normalized_raw_peer_infos = - normalized_peer_infos.iter().map(normalize_raw).collect(); + normalized_raw_peer_infos = normalized_peer_infos + .iter() + .map(|info| { + // Find original raw for this peer to preserve unknown fields + let orig_idx = peer_infos.iter().position(|p| p.peer_id == info.peer_id); + let mut raw = orig_idx + .and_then(|idx| raw_peer_infos.as_ref().map(|rpi| rpi[idx].clone())) + .unwrap_or_else(|| normalize_raw(info)); + patch_raw_from_info(&mut raw, info, &["proxy_cidrs", "feature_flag"]); + raw + }) + .collect(); (&normalized_peer_infos, &normalized_raw_peer_infos) } else { let mut peer_infos_mut = peer_infos.clone(); @@ -2967,6 +3007,13 @@ impl RouteSessionManager { .as_ref() .cloned() .unwrap_or_else(|| peer_infos_mut.iter().map(normalize_raw).collect()); + if from_is_shared { + for (info, raw) in peer_infos_mut.iter_mut().zip(raw_peer_infos_mut.iter_mut()) + { + info.trusted_credential_pubkeys.clear(); + patch_raw_from_info(raw, info, &["trusted_credential_pubkeys"]); + } + } if let Some((idx, info)) = peer_infos_mut .iter() .enumerate() @@ -2975,7 +3022,7 @@ impl RouteSessionManager { let mut info = info.clone(); SyncedRouteInfo::mark_credential_peer(&mut info, false); peer_infos_mut[idx] = info.clone(); - raw_peer_infos_mut[idx] = normalize_raw(&info); + patch_raw_from_info(&mut raw_peer_infos_mut[idx], &info, &["feature_flag"]); } normalized_peer_infos = peer_infos_mut; normalized_raw_peer_infos = raw_peer_infos_mut; @@ -3019,14 +3066,15 @@ impl RouteSessionManager { if need_update_route_table { // Run credential verification and update route table + let network_identity = service_impl.global_ctx.get_network_identity(); let (untrusted, global_trusted_keys) = service_impl .synced_route_info - .verify_and_update_credential_trusts(); + .verify_and_update_credential_trusts(network_identity.network_secret.as_deref()); untrusted_peers = untrusted; // Sync trusted keys to GlobalCtx for handshake verification service_impl .global_ctx - .update_trusted_keys(global_trusted_keys); + .update_trusted_keys(global_trusted_keys, &network_identity.network_name); service_impl.update_route_table_and_cached_local_conn_bitmap(); } @@ -3424,7 +3472,7 @@ mod tests { common::{NatType, PeerFeatureFlag}, peer_rpc::{ PeerIdentityType, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoRequest, - TrustedCredentialPubkey, + TrustedCredentialPubkey, TrustedCredentialPubkeyProof, }, }, tunnel::common::tests::wait_for_condition, @@ -3538,6 +3586,7 @@ mod tests { #[tokio::test] async fn trusted_credentials_only_from_admin_publishers() { let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx()); + let network_secret = "sec1"; let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() @@ -3553,11 +3602,14 @@ mod tests { is_credential_peer: false, ..Default::default() }); - admin_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkey { - pubkey: admin_key.clone(), - expiry_unix: now + 600, - ..Default::default() - }]; + admin_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkeyProof::new_signed( + TrustedCredentialPubkey { + pubkey: admin_key.clone(), + expiry_unix: now + 600, + ..Default::default() + }, + network_secret, + )]; let mut credential_info = RoutePeerInfo::new(); credential_info.peer_id = 21; @@ -3566,11 +3618,15 @@ mod tests { is_credential_peer: true, ..Default::default() }); - credential_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkey { - pubkey: credential_key.clone(), - expiry_unix: now + 600, - ..Default::default() - }]; + credential_info.trusted_credential_pubkeys = + vec![TrustedCredentialPubkeyProof::new_signed( + TrustedCredentialPubkey { + pubkey: credential_key.clone(), + expiry_unix: now + 600, + ..Default::default() + }, + network_secret, + )]; { let mut guard = service_impl.synced_route_info.peer_infos.write(); @@ -3580,7 +3636,7 @@ mod tests { service_impl .synced_route_info - .verify_and_update_credential_trusts(); + .verify_and_update_credential_trusts(Some(network_secret)); assert!(service_impl .synced_route_info @@ -3647,6 +3703,72 @@ mod tests { assert!(guard.get(&forwarded_peer_id).is_none()); } + #[tokio::test] + async fn sync_route_info_shared_sender_cannot_publish_trusted_credentials() { + let peer_mgr = create_mock_pmgr().await; + let route = create_mock_route(peer_mgr.clone()).await; + let from_peer_id: PeerId = 10021; + let forwarded_peer_id: PeerId = 10022; + let credential_key = vec![9u8; 32]; + + let identity_type = DashMap::new(); + identity_type.insert(from_peer_id, PeerIdentityType::SharedNode); + *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { + my_peer_id: peer_mgr.my_peer_id(), + identity_type, + })); + + let mut sender_info = RoutePeerInfo::new(); + sender_info.peer_id = from_peer_id; + sender_info.version = 1; + + let mut forwarded_info = RoutePeerInfo::new(); + forwarded_info.peer_id = forwarded_peer_id; + forwarded_info.version = 1; + forwarded_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkeyProof { + credential: Some(TrustedCredentialPubkey { + pubkey: credential_key.clone(), + expiry_unix: i64::MAX, + ..Default::default() + }), + credential_hmac: vec![1; 32], + }]; + + let make_raw = |info: &RoutePeerInfo| { + let mut raw = DynamicMessage::new(RoutePeerInfo::default().descriptor()); + raw.transcode_from(info).unwrap(); + raw + }; + let raw_infos = vec![make_raw(&sender_info), make_raw(&forwarded_info)]; + + route + .session_mgr + .do_sync_route_info( + from_peer_id, + 1, + true, + Some(vec![sender_info, forwarded_info]), + Some(raw_infos), + None, + None, + ) + .await + .unwrap(); + + let guard = route.service_impl.synced_route_info.peer_infos.read(); + assert!(guard + .get(&forwarded_peer_id) + .map(|x| x.trusted_credential_pubkeys.is_empty()) + .unwrap_or(false)); + drop(guard); + + assert!(!route + .service_impl + .synced_route_info + .trusted_credential_pubkeys + .contains_key(&credential_key)); + } + #[tokio::test] async fn sync_route_info_forces_non_credential_for_legacy_admin_sender() { let peer_mgr = create_mock_pmgr().await; @@ -4298,4 +4420,197 @@ mod tests { connect_peer_manager(p_b.clone(), p_c.clone()).await; wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap(); } + + /// Helper: create a raw DynamicMessage from a RoutePeerInfo with an extra + /// unknown field appended (field number 9999, varint value 42). + /// Returns the raw DynamicMessage and the encoded unknown field bytes. + fn make_raw_with_unknown_field(info: &RoutePeerInfo) -> (DynamicMessage, Vec) { + // Encode the info to bytes + let mut bytes = info.encode_to_vec(); + // Append an unknown field: field 9999, wire type 0 (varint), value 42 + // Tag = (9999 << 3) | 0 = 79992, encoded as varint + prost::encoding::encode_key(9999, prost::encoding::WireType::Varint, &mut bytes); + prost::encoding::encode_varint(42, &mut bytes); + let unknown_field_bytes = bytes[info.encoded_len()..].to_vec(); + // Decode as DynamicMessage — unknown fields are preserved + let raw = DynamicMessage::decode(RoutePeerInfo::default().descriptor(), bytes.as_slice()) + .unwrap(); + (raw, unknown_field_bytes) + } + + /// Check that a raw DynamicMessage still contains the unknown field bytes + /// by re-encoding and checking the suffix. + fn raw_has_unknown_bytes(raw: &DynamicMessage, unknown_bytes: &[u8]) -> bool { + let encoded = raw.encode_to_vec(); + // The unknown field bytes should appear somewhere in the encoded output + encoded + .windows(unknown_bytes.len()) + .any(|w| w == unknown_bytes) + } + + #[tokio::test] + async fn sync_route_preserves_unknown_fields_for_credential_sender() { + let peer_mgr = create_mock_pmgr().await; + let route = create_mock_route(peer_mgr.clone()).await; + let from_peer_id: PeerId = 20001; + + let identity_type = DashMap::new(); + identity_type.insert(from_peer_id, PeerIdentityType::Credential); + *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { + my_peer_id: peer_mgr.my_peer_id(), + identity_type, + })); + + let mut sender_info = RoutePeerInfo::new(); + sender_info.peer_id = from_peer_id; + sender_info.version = 1; + + let (raw, unknown_bytes) = make_raw_with_unknown_field(&sender_info); + + route + .session_mgr + .do_sync_route_info( + from_peer_id, + 1, + true, + Some(vec![sender_info]), + Some(vec![raw]), + None, + None, + ) + .await + .unwrap(); + + let stored_raw = route + .service_impl + .synced_route_info + .raw_peer_infos + .get(&from_peer_id) + .expect("raw peer info should be stored"); + assert!( + raw_has_unknown_bytes(stored_raw.value(), &unknown_bytes), + "unknown fields should be preserved for credential sender" + ); + } + + #[tokio::test] + async fn sync_route_preserves_unknown_fields_for_shared_sender() { + let peer_mgr = create_mock_pmgr().await; + let route = create_mock_route(peer_mgr.clone()).await; + let from_peer_id: PeerId = 20011; + let forwarded_peer_id: PeerId = 20012; + + let identity_type = DashMap::new(); + identity_type.insert(from_peer_id, PeerIdentityType::SharedNode); + *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { + my_peer_id: peer_mgr.my_peer_id(), + identity_type, + })); + + let mut sender_info = RoutePeerInfo::new(); + sender_info.peer_id = from_peer_id; + sender_info.version = 1; + + let mut forwarded_info = RoutePeerInfo::new(); + forwarded_info.peer_id = forwarded_peer_id; + forwarded_info.version = 1; + forwarded_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkeyProof { + credential: Some(TrustedCredentialPubkey { + pubkey: vec![9u8; 32], + expiry_unix: i64::MAX, + ..Default::default() + }), + credential_hmac: vec![1; 32], + }]; + + let (raw_sender, unknown_sender) = make_raw_with_unknown_field(&sender_info); + let (raw_forwarded, unknown_forwarded) = make_raw_with_unknown_field(&forwarded_info); + + route + .session_mgr + .do_sync_route_info( + from_peer_id, + 1, + true, + Some(vec![sender_info, forwarded_info]), + Some(vec![raw_sender, raw_forwarded]), + None, + None, + ) + .await + .unwrap(); + + // Shared node: trusted_credential_pubkeys cleared but unknown fields preserved + let stored_sender = route + .service_impl + .synced_route_info + .raw_peer_infos + .get(&from_peer_id) + .expect("sender raw should be stored"); + assert!( + raw_has_unknown_bytes(stored_sender.value(), &unknown_sender), + "unknown fields should be preserved for shared sender's own info" + ); + + let stored_forwarded = route + .service_impl + .synced_route_info + .raw_peer_infos + .get(&forwarded_peer_id) + .expect("forwarded raw should be stored"); + assert!( + raw_has_unknown_bytes(stored_forwarded.value(), &unknown_forwarded), + "unknown fields should be preserved for shared sender's forwarded info" + ); + } + + #[tokio::test] + async fn sync_route_preserves_unknown_fields_for_admin_sender() { + let peer_mgr = create_mock_pmgr().await; + let route = create_mock_route(peer_mgr.clone()).await; + let from_peer_id: PeerId = 20021; + + let identity_type = DashMap::new(); + identity_type.insert(from_peer_id, PeerIdentityType::Admin); + *route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface { + my_peer_id: peer_mgr.my_peer_id(), + identity_type, + })); + + let mut sender_info = RoutePeerInfo::new(); + sender_info.peer_id = from_peer_id; + sender_info.version = 1; + // Set is_credential_peer=true so the mark_credential_peer(false) path triggers + sender_info.feature_flag = Some(PeerFeatureFlag { + is_credential_peer: true, + ..Default::default() + }); + + let (raw, unknown_bytes) = make_raw_with_unknown_field(&sender_info); + + route + .session_mgr + .do_sync_route_info( + from_peer_id, + 1, + true, + Some(vec![sender_info]), + Some(vec![raw]), + None, + None, + ) + .await + .unwrap(); + + let stored_raw = route + .service_impl + .synced_route_info + .raw_peer_infos + .get(&from_peer_id) + .expect("raw peer info should be stored"); + assert!( + raw_has_unknown_bytes(stored_raw.value(), &unknown_bytes), + "unknown fields should be preserved for admin sender (mark non-credential path)" + ); + } } diff --git a/easytier/src/peers/peer_session.rs b/easytier/src/peers/peer_session.rs index 3e1a7c95..decd6517 100644 --- a/easytier/src/peers/peer_session.rs +++ b/easytier/src/peers/peer_session.rs @@ -36,7 +36,7 @@ pub enum PeerSessionAction { Create, } -#[derive(PartialEq, Clone, Eq, Hash)] +#[derive(PartialEq, Clone, Eq, Hash, Debug)] pub struct SessionKey { network_name: String, peer_id: PeerId, @@ -95,6 +95,7 @@ impl PeerSessionStore { .retain(|_key, session| Arc::strong_count(session) > 1); } + #[tracing::instrument(skip(self))] pub fn upsert_responder_session( &self, key: &SessionKey, @@ -103,6 +104,7 @@ impl PeerSessionStore { recv_algorithm: String, peer_static_pubkey: Option<[u8; 32]>, ) -> Result { + tracing::event!(tracing::Level::INFO, "upsert_responder_session {:?}", key); let existing = self .sessions .get(key) @@ -159,6 +161,7 @@ impl PeerSessionStore { } #[allow(clippy::too_many_arguments)] + #[tracing::instrument(skip(self))] pub fn apply_initiator_action( &self, key: &SessionKey, @@ -170,12 +173,7 @@ impl PeerSessionStore { recv_algorithm: String, peer_static_pubkey: Option<[u8; 32]>, ) -> Result, anyhow::Error> { - tracing::info!( - "apply_initiator_action {:?}, send_algorithm: {}, recv_algorithm: {}", - action, - send_algorithm, - recv_algorithm - ); + tracing::event!(tracing::Level::INFO, "apply_initiator_action {:?}", key); match action { PeerSessionAction::Join => { let Some(session) = self.get(key) else { @@ -301,9 +299,9 @@ impl ReplayWindow256 { if bit_shift > 0 { let mut carry = 0u8; - for b in self.bitmap.iter_mut().rev() { - let new_carry = *b << (8 - bit_shift); - *b = (*b >> bit_shift) | carry; + for b in self.bitmap.iter_mut() { + let new_carry = *b >> (8 - bit_shift); + *b = (*b << bit_shift) | carry; carry = new_carry; } } @@ -560,12 +558,7 @@ impl PeerSession { { let mut rx = self.rx_slots.lock().unwrap(); for dir in 0..2 { - rx[dir][0] = EpochRxSlot { - epoch: initial_epoch, - window: ReplayWindow256::default(), - last_rx_ms: 0, - valid: true, - }; + rx[dir][0].clear(); rx[dir][1].clear(); } } @@ -816,7 +809,11 @@ impl PeerSession { let now_ms = now_ms(); if !self.check_replay(epoch, seq, dir, now_ms) { - return Err(anyhow!("replay rejected")); + return Err(anyhow!( + "replay rejected, sender_peer_id: {:?}, receiver_peer_id: {:?}", + sender_peer_id, + receiver_peer_id + )); } let encryptor = self @@ -919,4 +916,71 @@ mod tests { assert!(s.check_replay(1, 1, 0, now + 1)); assert!(s.check_replay(1, 2, 0, now + 2)); } + + #[test] + fn replay_window_shift_preserves_bits() { + let mut w = ReplayWindow256::default(); + // Accept seqs 0..10 + for i in 0..10u64 { + assert!(w.accept(i), "seq {i} should be accepted"); + } + assert_eq!(w.max_seq, 9); + + // All seqs 0..10 should be marked as seen (replay) + for i in 0..10u64 { + assert!(!w.accept(i), "seq {i} should be rejected as replay"); + } + + // Seq 10 should still be accepted + assert!(w.accept(10)); + } + + #[test] + fn replay_window_out_of_order_within_window() { + let mut w = ReplayWindow256::default(); + // Accept even seqs 0,2,4,...,20 + for i in (0..=20u64).step_by(2) { + assert!(w.accept(i), "seq {i} should be accepted"); + } + // Now accept odd seqs 1,3,5,...,19 (out of order, within window) + for i in (1..=19u64).step_by(2) { + assert!(w.accept(i), "seq {i} should be accepted (out of order)"); + } + // All seqs 0..=20 should now be marked as seen + for i in 0..=20u64 { + assert!(!w.accept(i), "seq {i} should be rejected as replay"); + } + } + + #[test] + fn sync_root_key_allows_any_epoch_from_remote() { + // After sync_root_key, the remote peer may still be sending at an + // old epoch. The receiver should accept those packets. + let peer_id: PeerId = 10; + let root_key = PeerSession::new_root_key(); + let s = PeerSession::new( + peer_id, + root_key, + 1, + 0, + "aes-256-gcm".to_string(), + "aes-256-gcm".to_string(), + None, + ); + + // Simulate receiving some packets at epoch 0 + let now = now_ms(); + assert!(s.check_replay(0, 0, 0, now)); + assert!(s.check_replay(0, 1, 0, now)); + + // Sync with initial_epoch=2 (simulating a Sync action) + s.sync_root_key(root_key, 2, 2); + + // Remote peer is still sending at epoch 0 — should be accepted + // (rx_slots were cleared, so the first packet establishes the epoch) + assert!( + s.check_replay(0, 10, 0, now + 1), + "packets at old epoch should be accepted after sync" + ); + } } diff --git a/easytier/src/peers/relay_peer_map.rs b/easytier/src/peers/relay_peer_map.rs index be8e274e..bc8e377a 100644 --- a/easytier/src/peers/relay_peer_map.rs +++ b/easytier/src/peers/relay_peer_map.rs @@ -1,12 +1,12 @@ use std::{sync::Arc, time::Instant}; use dashmap::DashMap; -use hmac::Mac; use prost::Message; use snow::params::NoiseParams; use tokio::sync::{oneshot, Mutex, OwnedMutexGuard}; use tokio::time::{timeout, Duration}; +use crate::peers::foreign_network_client::ForeignNetworkClient; use crate::{ common::error::Error, common::{global_ctx::ArcGlobalCtx, PeerId}, @@ -43,6 +43,7 @@ impl Default for RelayPeerState { pub struct RelayPeerMap { peer_map: Arc, + foreign_network_client: Option>, global_ctx: ArcGlobalCtx, my_peer_id: PeerId, peer_session_store: Arc, @@ -50,17 +51,26 @@ pub struct RelayPeerMap { pending_handshakes: DashMap>, handshake_locks: DashMap>>, pub(crate) pending_packets: DashMap>, + + is_secure_mode_enabled: bool, } impl RelayPeerMap { pub fn new( peer_map: Arc, + foreign_network_client: Option>, global_ctx: ArcGlobalCtx, my_peer_id: PeerId, peer_session_store: Arc, ) -> Arc { + let is_secure_mode_enabled = global_ctx + .config + .get_secure_mode() + .map(|cfg| cfg.enabled) + .unwrap_or(false); Arc::new(Self { peer_map, + foreign_network_client, global_ctx, my_peer_id, peer_session_store, @@ -68,15 +78,12 @@ impl RelayPeerMap { pending_handshakes: DashMap::new(), handshake_locks: DashMap::new(), pending_packets: DashMap::new(), + is_secure_mode_enabled, }) } - fn is_secure_mode_enabled(&self) -> bool { - self.global_ctx - .config - .get_secure_mode() - .map(|cfg| cfg.enabled) - .unwrap_or(false) + pub fn is_secure_mode_enabled(&self) -> bool { + self.is_secure_mode_enabled } fn get_local_keypair(&self) -> Result<(Vec, Vec), Error> { @@ -134,12 +141,19 @@ impl RelayPeerMap { policy: NextHopPolicy, ) -> Result<(), Error> { let Some(next_hop) = self.peer_map.get_gateway_peer_id(dst_peer_id, policy).await else { - return Err(Error::RouteError(None)); + return Err(Error::RouteError(Some(format!( + "next hop not found in route for peer {dst_peer_id:?}" + )))); }; - if !self.peer_map.has_peer(next_hop) { - return Err(Error::RouteError(None)); + if self.peer_map.has_peer(next_hop) { + self.peer_map.send_msg_directly(msg, next_hop).await + } else if let Some(foreign_network_client) = &self.foreign_network_client { + foreign_network_client.send_msg(msg, next_hop).await + } else { + Err(Error::RouteError(Some(format!( + "next hop not found in direct peer map: {next_hop:?}" + )))) } - self.peer_map.send_msg_directly(msg, next_hop).await } pub async fn send_msg( @@ -321,7 +335,6 @@ impl RelayPeerMap { let a_conn_id = uuid::Uuid::new_v4(); let msg1_pb = RelayNoiseMsg1Pb { version: RELAY_NOISE_VERSION, - a_network_name: network.network_name.clone(), a_session_generation, a_conn_id: Some(a_conn_id.into()), client_encryption_algorithm: self.global_ctx.get_flags().encryption_algorithm.clone(), @@ -331,7 +344,6 @@ impl RelayPeerMap { let out_len = hs .write_message(&payload, &mut out) .map_err(|e| Error::RouteError(Some(format!("noise write msg1 failed: {e:?}"))))?; - let server_handshake_hash = hs.get_handshake_hash().to_vec(); let (tx, rx) = oneshot::channel(); self.pending_handshakes.insert(dst_peer_id, tx); @@ -374,27 +386,6 @@ impl RelayPeerMap { "relay msg2 conn_id_echo mismatch".to_string(), ))); } - if msg2_pb.b_network_name == network.network_name { - if msg2_pb.role_hint != 1 { - return Err(Error::RouteError(Some( - "role_hint must be 1 when network_name is same".to_string(), - ))); - } - let Some(secret_proof_32) = msg2_pb.secret_proof_32 else { - return Err(Error::RouteError(Some( - "secret_proof_32 must be present when role_hint is 1".to_string(), - ))); - }; - let verify_result = self - .global_ctx - .get_secret_proof(&server_handshake_hash) - .map(|mac| mac.verify_slice(&secret_proof_32).is_ok()); - if verify_result != Some(true) { - return Err(Error::RouteError(Some( - "secret_proof_32 verify failed".to_string(), - ))); - } - } let action = PeerConnSessionActionPb::try_from(msg2_pb.action) .map_err(|_| Error::RouteError(Some("invalid session action".to_string())))?; @@ -539,7 +530,6 @@ impl RelayPeerMap { &mut hs, msg1, )?; - let remote_network_name = msg1_pb.a_network_name.clone(); let remote_static = hs .get_remote_static() .map(|x: &[u8]| x.to_vec()) @@ -564,16 +554,6 @@ impl RelayPeerMap { } let server_network_name = self.global_ctx.get_network_name(); - let (role_hint, secret_proof_32) = if remote_network_name == server_network_name { - let proof = self - .global_ctx - .get_secret_proof(hs.get_handshake_hash()) - .map(|mac| mac.finalize().into_bytes().to_vec()); - (1, proof) - } else { - (2, None) - }; - let algo = self.global_ctx.get_flags().encryption_algorithm.clone(); let key = SessionKey::new(server_network_name.clone(), remote_peer_id); let upsert = self @@ -587,8 +567,6 @@ impl RelayPeerMap { ) .map_err(|e| Error::RouteError(Some(format!("{e:?}"))))?; let msg2_pb = RelayNoiseMsg2Pb { - b_network_name: server_network_name, - role_hint, action: match upsert.action { PeerSessionAction::Join => PeerConnSessionActionPb::Join as i32, PeerSessionAction::Sync => PeerConnSessionActionPb::Sync as i32, @@ -599,7 +577,6 @@ impl RelayPeerMap { initial_epoch: upsert.initial_epoch, b_conn_id: Some(uuid::Uuid::new_v4().into()), a_conn_id_echo: msg1_pb.a_conn_id, - secret_proof_32, server_encryption_algorithm: algo, }; let payload = msg2_pb.encode_to_vec(); @@ -625,7 +602,7 @@ impl RelayPeerMap { Ok(()) } - pub fn decrypt_if_needed(&self, packet: &mut ZCPacket) -> Result { + pub async fn decrypt_if_needed(self: &Arc, packet: &mut ZCPacket) -> Result { if !self.is_secure_mode_enabled() { return Ok(false); } @@ -636,6 +613,12 @@ impl RelayPeerMap { let network = self.global_ctx.get_network_identity(); let key = SessionKey::new(network.network_name.clone(), from_peer_id); let Some(session) = self.peer_session_store.get(&key) else { + tracing::debug!( + "relay session not found for peer {}, try handshake", + from_peer_id + ); + self.ensure_session(from_peer_id, NextHopPolicy::LeastHop) + .await?; return Ok(false); }; let now = Instant::now(); diff --git a/easytier/src/peers/rpc_service.rs b/easytier/src/peers/rpc_service.rs index 4263f79b..61e1cda4 100644 --- a/easytier/src/peers/rpc_service.rs +++ b/easytier/src/peers/rpc_service.rs @@ -254,6 +254,11 @@ impl CredentialManageRpc for PeerManagerRpcService { ) -> Result { let pm = weak_upgrade(&self.peer_manager)?; let global_ctx = pm.get_global_ctx(); + if global_ctx.get_network_identity().network_secret.is_none() { + return Err(rpc_types::error::Error::ExecutionError(anyhow::anyhow!( + "only admin nodes (with network_secret) can revoke credentials" + ))); + } let success = global_ctx .get_credential_manager() diff --git a/easytier/src/peers/tests.rs b/easytier/src/peers/tests.rs index f5513627..81d303aa 100644 --- a/easytier/src/peers/tests.rs +++ b/easytier/src/peers/tests.rs @@ -163,7 +163,7 @@ async fn relay_peer_map_secure_session_decrypt() { set_secure_mode_cfg(&ctx, true); let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10)); let store = Arc::new(PeerSessionStore::new()); - let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone()); + let relay_map = RelayPeerMap::new(peer_map, None, ctx.clone(), 10, store.clone()); let algo = ctx.get_flags().encryption_algorithm.clone(); let root_key = [7u8; 32]; @@ -188,7 +188,7 @@ async fn relay_peer_map_secure_session_decrypt() { let mut packet = ZCPacket::new_with_payload(b"relay-hello"); packet.fill_peer_manager_hdr(20, 10, PacketType::Data as u8); session.encrypt_payload(20, 10, &mut packet).unwrap(); - assert!(relay_map.decrypt_if_needed(&mut packet).unwrap()); + assert!(relay_map.decrypt_if_needed(&mut packet).await.unwrap()); assert_eq!(packet.payload(), b"relay-hello"); } @@ -200,6 +200,7 @@ async fn relay_peer_map_retry_backoff_and_evict() { let peer_map = Arc::new(PeerMap::new(s, ctx_secure.clone(), 10)); let relay_map = RelayPeerMap::new( peer_map, + None, ctx_secure.clone(), 10, Arc::new(PeerSessionStore::new()), @@ -217,6 +218,7 @@ async fn relay_peer_map_retry_backoff_and_evict() { let peer_map_plain = Arc::new(PeerMap::new(s2, ctx_plain.clone(), 30)); let relay_map_plain = RelayPeerMap::new( peer_map_plain, + None, ctx_plain.clone(), 30, Arc::new(PeerSessionStore::new()), @@ -244,7 +246,7 @@ async fn relay_peer_map_pending_packet_buffer() { set_secure_mode_cfg(&ctx, true); let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10)); let store = Arc::new(PeerSessionStore::new()); - let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone()); + let relay_map = RelayPeerMap::new(peer_map, None, ctx.clone(), 10, store.clone()); // Send multiple packets while no session exists (handshake will fail, but packets should be buffered) for i in 0..5u8 { @@ -554,7 +556,7 @@ async fn relay_peer_map_remove_peer() { set_secure_mode_cfg(&ctx, true); let peer_map = Arc::new(PeerMap::new(s, ctx.clone(), 10)); let store = Arc::new(PeerSessionStore::new()); - let relay_map = RelayPeerMap::new(peer_map, ctx.clone(), 10, store.clone()); + let relay_map = RelayPeerMap::new(peer_map, None, ctx.clone(), 10, store.clone()); let peer_1: PeerId = 100; diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 84538b71..5ededd12 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -13,6 +13,11 @@ message TrustedCredentialPubkey { repeated string allowed_proxy_cidrs = 5; // allowed proxy_cidrs ranges } +message TrustedCredentialPubkeyProof { + TrustedCredentialPubkey credential = 1; + bytes credential_hmac = 2; +} + message RoutePeerInfo { // means next hop in route table. uint32 peer_id = 1; @@ -40,7 +45,7 @@ message RoutePeerInfo { bytes noise_static_pubkey = 18; // Trusted credential public keys published by admin nodes (holding network_secret) - repeated TrustedCredentialPubkey trusted_credential_pubkeys = 19; + repeated TrustedCredentialPubkeyProof trusted_credential_pubkeys = 19; } message PeerIdVersion { @@ -313,22 +318,18 @@ message PeerConnNoiseMsg2Pb { message RelayNoiseMsg1Pb { uint32 version = 1; - string a_network_name = 2; optional uint32 a_session_generation = 3; common.UUID a_conn_id = 4; string client_encryption_algorithm = 5; } message RelayNoiseMsg2Pb { - string b_network_name = 1; - uint32 role_hint = 2; PeerConnSessionActionPb action = 3; uint32 b_session_generation = 4; optional bytes root_key_32 = 5; uint32 initial_epoch = 6; common.UUID b_conn_id = 7; common.UUID a_conn_id_echo = 8; - optional bytes secret_proof_32 = 9; string server_encryption_algorithm = 10; } diff --git a/easytier/src/proto/peer_rpc.rs b/easytier/src/proto/peer_rpc.rs index f6a5fded..f60a72a4 100644 --- a/easytier/src/proto/peer_rpc.rs +++ b/easytier/src/proto/peer_rpc.rs @@ -1,4 +1,5 @@ use hmac::{Hmac, Mac}; +use prost::Message; use sha2::Sha256; use crate::common::PeerId; @@ -38,6 +39,42 @@ impl PeerGroupInfo { } } +impl TrustedCredentialPubkeyProof { + pub fn generate_credential_hmac( + credential: &TrustedCredentialPubkey, + network_secret: &str, + ) -> Vec { + let mut mac = Hmac::::new_from_slice(network_secret.as_bytes()) + .expect("HMAC can take key of any size"); + mac.update(b"easytier credential proof"); + mac.update(&credential.encode_to_vec()); + mac.finalize().into_bytes().to_vec() + } + + pub fn new_signed(credential: TrustedCredentialPubkey, network_secret: &str) -> Self { + let credential_hmac = Self::generate_credential_hmac(&credential, network_secret); + Self { + credential: Some(credential), + credential_hmac, + } + } + + pub fn verify_credential_hmac(&self, network_secret: &str) -> bool { + let Some(credential) = self.credential.as_ref() else { + return false; + }; + if self.credential_hmac.is_empty() { + return false; + } + + let mut mac = Hmac::::new_from_slice(network_secret.as_bytes()) + .expect("HMAC can take key of any size"); + mac.update(b"easytier credential proof"); + mac.update(&credential.encode_to_vec()); + mac.verify_slice(&self.credential_hmac).is_ok() + } +} + impl From for sync_route_info_request::ConnInfo { fn from(val: RouteConnBitmap) -> Self { Self::ConnBitmap(val) @@ -254,4 +291,35 @@ mod tests { println!("verify took {:?} for {} iterations", duration, iterations); println!("Avg time per iteration: {:?}", duration / iterations as u32); } + + #[test] + fn test_trusted_credential_pubkey_hmac_valid() { + let credential = TrustedCredentialPubkey { + pubkey: vec![7u8; 32], + groups: vec!["ops".to_string(), "guest".to_string()], + allow_relay: true, + expiry_unix: 123456, + allowed_proxy_cidrs: vec!["10.0.0.0/24".to_string()], + }; + let tc = TrustedCredentialPubkeyProof::new_signed(credential, "sec-1"); + + assert!(tc.verify_credential_hmac("sec-1")); + assert!(!tc.verify_credential_hmac("sec-2")); + } + + #[test] + fn test_trusted_credential_pubkey_hmac_tampered() { + let credential = TrustedCredentialPubkey { + pubkey: vec![8u8; 32], + groups: vec!["g1".to_string()], + allow_relay: false, + expiry_unix: 1, + allowed_proxy_cidrs: vec![], + }; + let tc = TrustedCredentialPubkeyProof::new_signed(credential, "sec-1"); + + let mut tampered = tc.clone(); + tampered.credential.as_mut().unwrap().allow_relay = true; + assert!(!tampered.verify_credential_hmac("sec-1")); + } } diff --git a/easytier/src/tests/credential_tests.rs b/easytier/src/tests/credential_tests.rs index 9803816e..650daedc 100644 --- a/easytier/src/tests/credential_tests.rs +++ b/easytier/src/tests/credential_tests.rs @@ -148,6 +148,29 @@ fn create_admin_config( config } +fn create_shared_config( + inst_name: &str, + ns: Option<&str>, + ipv4: &str, + ipv6: &str, +) -> TomlConfigLoader { + let config = TomlConfigLoader::default(); + config.set_inst_name(inst_name.to_owned()); + config.set_netns(ns.map(|s| s.to_owned())); + config.set_ipv4(Some(ipv4.parse().unwrap())); + config.set_ipv6(Some(ipv6.parse().unwrap())); + config.set_listeners(vec![ + "tcp://0.0.0.0:11010".parse().unwrap(), + "udp://0.0.0.0:11010".parse().unwrap(), + ]); + config.set_network_identity(NetworkIdentity::new( + "shared_network".to_string(), + "".to_string(), + )); + config.set_secure_mode(Some(generate_secure_mode_config())); + config +} + /// Test 1: Basic credential node connectivity /// Topology: Admin ← Credential /// Verifies that a credential node can connect to an admin node and appears in routes @@ -298,7 +321,6 @@ async fn credential_relay_capability(#[case] allow_relay: bool) { config.set_netns(Some("ns_c1".to_string())); config.set_ipv4(Some("10.144.144.2".parse().unwrap())); config.set_ipv6(Some("fd00::2/64".parse().unwrap())); - config.set_listeners(vec!["tcp://0.0.0.0:11021".parse().unwrap()]); config.set_network_identity(NetworkIdentity::new_credential( admin_inst .get_global_ctx() @@ -326,7 +348,6 @@ async fn credential_relay_capability(#[case] allow_relay: bool) { config.set_netns(Some("ns_c2".to_string())); config.set_ipv4(Some("10.144.144.3".parse().unwrap())); config.set_ipv6(Some("fd00::3/64".parse().unwrap())); - config.set_listeners(vec!["tcp://0.0.0.0:11022".parse().unwrap()]); config.set_network_identity(NetworkIdentity::new_credential( admin_inst .get_global_ctx() @@ -787,3 +808,124 @@ async fn credential_unknown_rejected() { drop_insts(vec![admin_inst, cred_inst]).await; } + +#[rstest::rstest] +#[tokio::test] +#[serial_test::serial] +async fn credential_admin_shared_admin_credential_connectivity( + #[values(true, false)] connect_to_admin: bool, +) { + prepare_credential_network(); + + // 10.1.1.1 + let admin_a_config = + create_admin_config("admin_a", Some("ns_adm"), "10.144.144.1", "fd00::1/64"); + let mut admin_a_inst = Instance::new(admin_a_config); + admin_a_inst.run().await.unwrap(); + + // 10.1.1.2 + let shared_b_config = + create_shared_config("shared_b", Some("ns_c1"), "10.144.144.2", "fd00::2/64"); + let mut shared_b_inst = Instance::new(shared_b_config); + shared_b_inst.run().await.unwrap(); + + // 10.1.1.4 + let admin_c_config = + create_admin_config("admin_c", Some("ns_c3"), "10.144.144.4", "fd00::4/64"); + let mut admin_c_inst = Instance::new(admin_c_config); + admin_c_inst.run().await.unwrap(); + + admin_a_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new( + "tcp://10.1.1.2:11010".parse().unwrap(), + )); + admin_c_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new( + "tcp://10.1.1.2:11010".parse().unwrap(), + )); + + // print all peer ids + println!("admin_a_peer_id: {:?}", admin_a_inst.peer_id()); + println!("shared_b_peer_id: {:?}", shared_b_inst.peer_id()); + println!("admin_c_peer_id: {:?}", admin_c_inst.peer_id()); + + let admin_c_peer_id = admin_c_inst.peer_id(); + wait_for_condition( + || async { + let a_routes = admin_a_inst.get_peer_manager().list_routes().await; + let c_routes = admin_c_inst.get_peer_manager().list_routes().await; + println!( + "bootstrap routes: a={:?} c={:?}", + a_routes.iter().map(|r| r.peer_id).collect::>(), + c_routes.iter().map(|r| r.peer_id).collect::>() + ); + a_routes.iter().any(|r| r.peer_id == admin_c_peer_id) + || c_routes.iter().any(|r| r.peer_id == admin_a_inst.peer_id()) + }, + Duration::from_secs(3), + ) + .await; + + let cred_d_config = create_credential_config( + &admin_a_inst, + "cred_d", + Some("ns_c2"), + "10.144.144.5", + "fd00::5/64", + ) + .await; + admin_a_inst + .get_global_ctx() + .issue_event(GlobalCtxEvent::CredentialChanged); + + let mut cred_d_inst = Instance::new(cred_d_config); + cred_d_inst.run().await.unwrap(); + let cred_d_peer_id = cred_d_inst.peer_id(); + + cred_d_inst + .get_conn_manager() + .add_connector(TcpTunnelConnector::new(if !connect_to_admin { + // connect to shared node + "tcp://10.1.1.2:11010".parse().unwrap() + } else { + // connect to admin node + "tcp://10.1.1.4:11010".parse().unwrap() + })); + // print all peer ids + println!("cred_d_peer_id: {:?}", cred_d_peer_id); + + wait_for_condition( + || async { + admin_c_inst + .get_peer_manager() + .list_routes() + .await + .iter() + .any(|r| r.peer_id == cred_d_peer_id) + }, + Duration::from_secs(60), + ) + .await; + + wait_for_condition( + || async { ping_test("ns_c3", "10.144.144.5", None).await }, + Duration::from_secs(15), + ) + .await; + + wait_for_condition( + || async { ping_test("ns_adm", "10.144.144.5", None).await }, + Duration::from_secs(15), + ) + .await; + + wait_for_condition( + || async { ping_test("ns_c2", "10.144.144.4", None).await }, + Duration::from_secs(15), + ) + .await; + + drop_insts(vec![admin_a_inst, shared_b_inst, admin_c_inst, cred_d_inst]).await; +}