multi_fix: harden peer/session handling, tighten foreign-network trust, and improve web client metadata (#1999)

* machine-id should be scoped unbder same user-id
* feat: report device os metadata to console
* fix sync root key cause packet loss
* fix tun packet not invalid
* fix faketcp cause lat jitter
* fix some packet not decrypt
* fix peer info patch, improve performance of update self info
* fix foreign credential identity mismatch handling
This commit is contained in:
KKRainbow
2026-03-21 21:06:07 +08:00
committed by GitHub
parent 77966916c4
commit 2bfdd44759
24 changed files with 1381 additions and 358 deletions
+55 -1
View File
@@ -134,6 +134,15 @@ impl TrustedKeyMapManager {
}
pub fn verify_trusted_key(&self, pubkey: &[u8], network_name: &str) -> bool {
self.verify_trusted_key_with_source(pubkey, network_name, None)
}
pub fn verify_trusted_key_with_source(
&self,
pubkey: &[u8],
network_name: &str,
source: Option<TrustedKeySource>,
) -> bool {
let Some(trusted_keys) = self
.network_trusted_keys
.get(network_name)
@@ -146,7 +155,11 @@ impl TrustedKeyMapManager {
return false;
};
!metadata.is_expired()
if let Some(source) = source {
metadata.source == source && !metadata.is_expired()
} else {
!metadata.is_expired()
}
}
pub fn list_trusted_keys(&self, network_name: &str) -> Vec<(Vec<u8>, TrustedKeyMetadata)> {
@@ -542,6 +555,16 @@ impl GlobalCtx {
false
}
pub fn is_pubkey_trusted_with_source(
&self,
pubkey: &[u8],
network_name: &str,
source: TrustedKeySource,
) -> bool {
self.trusted_keys
.verify_trusted_key_with_source(pubkey, network_name, Some(source))
}
/// Atomically replace all OSPF trusted keys with a new set
/// Called by OSPF route layer after each route update
pub fn update_trusted_keys(&self, keys: TrustedKeyMap, network_name: &str) {
@@ -676,6 +699,37 @@ pub mod tests {
);
}
#[tokio::test]
async fn trusted_key_source_lookup_is_precise() {
let config = TomlConfigLoader::default();
let global_ctx = GlobalCtx::new(config);
let network_name = "net1";
let pubkey = vec![1; 32];
global_ctx.update_trusted_keys(
HashMap::from([(
pubkey.clone(),
TrustedKeyMetadata {
source: TrustedKeySource::OspfCredential,
expiry_unix: None,
},
)]),
network_name,
);
assert!(global_ctx.is_pubkey_trusted(&pubkey, network_name));
assert!(!global_ctx.is_pubkey_trusted_with_source(
&pubkey,
network_name,
TrustedKeySource::OspfNode,
));
assert!(global_ctx.is_pubkey_trusted_with_source(
&pubkey,
network_name,
TrustedKeySource::OspfCredential,
));
}
pub fn get_mock_global_ctx_with_network(
network_identy: Option<NetworkIdentity>,
) -> ArcGlobalCtx {
+1
View File
@@ -24,6 +24,7 @@ pub mod ifcfg;
pub mod log;
pub mod netns;
pub mod network;
pub mod os_info;
pub mod scoped_task;
pub mod stats_manager;
pub mod stun;
+144
View File
@@ -0,0 +1,144 @@
use std::{collections::HashMap, fs, process::Command};
use crate::proto::web::DeviceOsInfo;
pub fn collect_device_os_info() -> DeviceOsInfo {
let os_type = normalize_os_type(std::env::consts::OS);
let (version, distribution) = detect_os_version_and_distribution(&os_type);
DeviceOsInfo {
os_type,
version,
distribution,
}
}
fn normalize_os_type(raw: &str) -> String {
match raw {
"macos" => "macos".to_string(),
"windows" => "windows".to_string(),
"linux" => "linux".to_string(),
"android" => "android".to_string(),
"ios" => "ios".to_string(),
"freebsd" => "freebsd".to_string(),
other => other.to_string(),
}
}
fn detect_os_version_and_distribution(os_type: &str) -> (String, String) {
match os_type {
"linux" | "android" => linux_version_and_distribution(os_type),
"macos" => (
first_non_empty([
command_output("sw_vers", &["-productVersion"]),
unix_kernel_release(),
]),
"macOS".to_string(),
),
"windows" => (
first_non_empty([windows_version(), None]),
"Windows".to_string(),
),
"freebsd" => (
first_non_empty([
command_output("freebsd-version", &[]),
unix_kernel_release(),
]),
"FreeBSD".to_string(),
),
other => (
unix_kernel_release().unwrap_or_else(|| "unknown".to_string()),
other.to_string(),
),
}
}
fn linux_version_and_distribution(os_type: &str) -> (String, String) {
let os_release = parse_os_release().unwrap_or_default();
let version = first_non_empty([
os_release.get("VERSION_ID").cloned(),
os_release.get("VERSION").cloned(),
unix_kernel_release(),
]);
let distribution = first_non_empty([
os_release.get("NAME").cloned(),
os_release.get("ID").cloned().map(title_case),
Some(if os_type == "android" {
"Android".to_string()
} else {
"Linux".to_string()
}),
]);
(version, distribution)
}
fn parse_os_release() -> Option<HashMap<String, String>> {
["/etc/os-release", "/usr/lib/os-release"]
.into_iter()
.find_map(|path| fs::read_to_string(path).ok())
.map(|content| {
content
.lines()
.filter_map(|line| {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
return None;
}
let (key, value) = line.split_once('=')?;
Some((key.to_string(), trim_os_release_value(value)))
})
.collect()
})
}
fn trim_os_release_value(value: &str) -> String {
value
.trim()
.trim_matches('"')
.trim_matches('\'')
.to_string()
}
fn unix_kernel_release() -> Option<String> {
command_output("uname", &["-r"])
}
fn windows_version() -> Option<String> {
let output = command_output("cmd", &["/C", "ver"])?;
output
.split("Version")
.nth(1)
.map(str::trim)
.map(|part| part.trim_matches(&['[', ']'][..]).to_string())
.filter(|value| !value.is_empty())
}
fn command_output(program: &str, args: &[&str]) -> Option<String> {
let output = Command::new(program).args(args).output().ok()?;
if !output.status.success() {
return None;
}
let value = String::from_utf8(output.stdout).ok()?;
let value = value.trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
}
fn first_non_empty<const N: usize>(values: [Option<String>; N]) -> String {
values
.into_iter()
.flatten()
.find(|value| !value.trim().is_empty())
.unwrap_or_else(|| "unknown".to_string())
}
fn title_case(value: String) -> String {
let mut chars = value.chars();
let Some(first) = chars.next() else {
return value;
};
first.to_uppercase().collect::<String>() + chars.as_str()
}
+164 -14
View File
@@ -303,6 +303,26 @@ impl ForeignNetworkEntry {
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
fn need_periodic_requery_peers(&self) -> bool {
true
}
async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option<PeerIdentityType> {
let peer_map = self.peer_map.upgrade()?;
peer_map.get_peer_identity_type(peer_id)
}
async fn get_peer_public_key(&self, peer_id: PeerId) -> Option<Vec<u8>> {
let peer_map = self.peer_map.upgrade()?;
peer_map.get_peer_public_key(peer_id)
}
async fn close_peer(&self, peer_id: PeerId) {
if let Some(peer_map) = self.peer_map.upgrade() {
let _ = peer_map.close_peer(peer_id).await;
}
}
}
let route = PeerRoute::new(
@@ -373,15 +393,15 @@ impl ForeignNetworkEntry {
continue;
}
if !peer_map.has_peer(from_peer_id) && relay_peer_map.is_secure_mode_enabled() {
if relay_peer_map.is_secure_mode_enabled() && hdr.is_encrypted() {
match relay_peer_map.decrypt_if_needed(&mut zc_packet).await {
Ok(true) => {}
Ok(false) => {
tracing::error!("relay session not found");
tracing::error!("secure session not found");
continue;
}
Err(e) => {
tracing::error!(?e, "relay decrypt failed");
tracing::error!(?e, "secure decrypt failed");
continue;
}
}
@@ -620,14 +640,27 @@ pub struct ForeignNetworkManager {
}
impl ForeignNetworkManager {
async fn is_shared_pubkey_trusted(
fn network_secret_digest_is_empty(network: &NetworkIdentity) -> bool {
network
.network_secret_digest
.as_ref()
.is_none_or(|d| d.iter().all(|b| *b == 0))
}
fn should_reject_credential_trust_path(identity_type: PeerIdentityType) -> bool {
matches!(identity_type, PeerIdentityType::Admin)
}
async fn is_credential_pubkey_trusted(
entry: &ForeignNetworkEntry,
remote_static_pubkey: &[u8],
) -> bool {
remote_static_pubkey.len() == 32
&& entry
.global_ctx
.is_pubkey_trusted(remote_static_pubkey, &entry.network.network_name)
&& entry.global_ctx.is_pubkey_trusted_with_source(
remote_static_pubkey,
&entry.network.network_name,
TrustedKeySource::OspfCredential,
)
}
fn build_trusted_key_items(entry: &ForeignNetworkEntry) -> Vec<TrustedKeyInfoPb> {
@@ -697,6 +730,20 @@ impl ForeignNetworkManager {
return ret;
}
let peer_digest_empty = Self::network_secret_digest_is_empty(&peer_network);
if peer_digest_empty
&& self
.data
.get_network_entry(&peer_network.network_name)
.is_none()
{
return Err(anyhow::anyhow!(
"foreign network {} is not established by a secret-verified peer yet",
peer_network.network_name
)
.into());
}
let (entry, new_added) = self
.data
.get_or_insert_entry(
@@ -711,13 +758,17 @@ impl ForeignNetworkManager {
.await;
let same_identity = entry.network == peer_network;
let shared_peer = peer_conn.get_peer_identity_type() == PeerIdentityType::SharedNode;
let shared_peer_trusted = shared_peer
&& Self::is_shared_pubkey_trusted(&entry, &conn_info.noise_remote_static_pubkey).await;
let peer_identity_type = peer_conn.get_peer_identity_type();
let credential_peer_trusted = peer_digest_empty
&& Self::is_credential_pubkey_trusted(&entry, &conn_info.noise_remote_static_pubkey)
.await;
let credential_identity_mismatch = credential_peer_trusted
&& Self::should_reject_credential_trust_path(peer_identity_type);
let _g = entry.lock.lock().await;
if (!(same_identity || shared_peer_trusted))
if (!(same_identity || credential_peer_trusted))
|| credential_identity_mismatch
|| entry.my_peer_id != peer_conn.get_my_peer_id()
{
if new_added {
@@ -730,13 +781,18 @@ impl ForeignNetworkManager {
entry.my_peer_id,
peer_conn.get_my_peer_id()
)
} else if credential_identity_mismatch {
anyhow::anyhow!(
"credential-trusted foreign peer has invalid identity type: {:?}",
peer_identity_type
)
} else {
anyhow::anyhow!(
"foreign peer identity not trusted. exp: {:?} real: {:?}, remote_pubkey_len: {}, shared_trusted: {}",
"foreign peer identity not trusted. exp: {:?} real: {:?}, remote_pubkey_len: {}, credential_trusted: {}",
entry.network,
peer_network,
conn_info.noise_remote_static_pubkey.len(),
shared_peer_trusted,
credential_peer_trusted,
)
};
tracing::error!(?err, "foreign network entry not match, disconnect peer");
@@ -911,7 +967,7 @@ pub mod tests {
set_global_var,
tunnel::common::tests::wait_for_condition,
};
use std::time::Duration;
use std::{collections::HashMap, time::Duration};
use super::*;
@@ -933,6 +989,20 @@ pub mod tests {
peer_mgr
}
async fn create_mock_credential_peer_manager_for_foreign_network(
network: &str,
) -> Arc<PeerManager> {
let (s, _r) = create_packet_recv_chan();
let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new_credential(
network.to_string(),
)));
set_secure_mode_cfg(&global_ctx, true);
let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, global_ctx, s));
replace_stun_info_collector(peer_mgr.clone(), NatType::Unknown);
peer_mgr.run().await.unwrap();
peer_mgr
}
pub async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
create_mock_peer_manager_for_foreign_network_ext(network, network).await
}
@@ -1027,6 +1097,86 @@ pub mod tests {
.is_empty());
}
#[tokio::test]
async fn credential_pubkey_trust_requires_ospf_credential_source() {
let global_ctx = get_mock_global_ctx_with_network(Some(NetworkIdentity::new(
"__access__".to_string(),
"access_secret".to_string(),
)));
let foreign_network = NetworkIdentity::new("net1".to_string(), "net1_secret".to_string());
let (pm_packet_sender, _pm_packet_recv) = create_packet_recv_chan();
let entry = ForeignNetworkEntry::new(
foreign_network.clone(),
1,
global_ctx.clone(),
false,
Arc::new(PeerSessionStore::new()),
pm_packet_sender,
);
let pubkey = vec![7; 32];
entry.global_ctx.update_trusted_keys(
HashMap::from([(
pubkey.clone(),
crate::common::global_ctx::TrustedKeyMetadata {
source: TrustedKeySource::OspfNode,
expiry_unix: None,
},
)]),
&foreign_network.network_name,
);
assert!(!ForeignNetworkManager::is_credential_pubkey_trusted(&entry, &pubkey).await);
entry.global_ctx.update_trusted_keys(
HashMap::from([(
pubkey.clone(),
crate::common::global_ctx::TrustedKeyMetadata {
source: TrustedKeySource::OspfCredential,
expiry_unix: None,
},
)]),
&foreign_network.network_name,
);
assert!(ForeignNetworkManager::is_credential_pubkey_trusted(&entry, &pubkey).await);
}
#[test]
fn credential_trust_path_rejects_admin_identity() {
assert!(ForeignNetworkManager::should_reject_credential_trust_path(
PeerIdentityType::Admin
));
assert!(!ForeignNetworkManager::should_reject_credential_trust_path(
PeerIdentityType::Credential
));
assert!(!ForeignNetworkManager::should_reject_credential_trust_path(
PeerIdentityType::SharedNode
));
}
#[tokio::test]
async fn zero_digest_peer_cannot_bootstrap_foreign_network() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
set_secure_mode_cfg(&pm_center.get_global_ctx(), true);
let pma_net1 = create_mock_credential_peer_manager_for_foreign_network("net1").await;
let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair();
let a_mgr_copy = pma_net1.clone();
let client = tokio::spawn(async move { a_mgr_copy.add_client_tunnel(a_ring, false).await });
let b_mgr_copy = pm_center.clone();
let server =
tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await });
assert!(client.await.unwrap().is_ok());
assert!(server.await.unwrap().is_err());
assert!(pm_center
.get_foreign_network_manager()
.list_foreign_networks()
.await
.foreign_networks
.is_empty());
}
async fn foreign_network_whitelist_helper(name: String) {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());
+109 -1
View File
@@ -2,6 +2,7 @@ use std::sync::Arc;
use crossbeam::atomic::AtomicCell;
use dashmap::{DashMap, DashSet};
use parking_lot::RwLock;
use tokio::{select, sync::mpsc};
@@ -42,6 +43,7 @@ pub struct Peer {
default_conn_id: Arc<AtomicCell<PeerConnId>>,
peer_identity_type: Arc<AtomicCell<Option<PeerIdentityType>>>,
peer_public_key: Arc<RwLock<Option<Vec<u8>>>>,
default_conn_id_clear_task: ScopedTask<()>,
}
@@ -56,6 +58,8 @@ impl Peer {
let shutdown_notifier = Arc::new(tokio::sync::Notify::new());
let peer_identity_type = Arc::new(AtomicCell::new(None));
let peer_identity_type_copy = peer_identity_type.clone();
let peer_public_key = Arc::new(RwLock::new(None));
let peer_public_key_copy = peer_public_key.clone();
let conns_copy = conns.clone();
let shutdown_notifier_copy = shutdown_notifier.clone();
@@ -82,6 +86,7 @@ impl Peer {
shrink_dashmap(&conns_copy, Some(4));
if conns_copy.is_empty() {
peer_identity_type_copy.store(None);
*peer_public_key_copy.write() = None;
}
}
}
@@ -126,6 +131,7 @@ impl Peer {
shutdown_notifier,
default_conn_id,
peer_identity_type,
peer_public_key,
default_conn_id_clear_task,
}
}
@@ -146,6 +152,22 @@ impl Peer {
let close_notifier = conn.get_close_notifier();
let conn_info = conn.get_conn_info();
let conn_pubkey = conn_info.noise_remote_static_pubkey.clone();
{
let mut peer_pubkey = self.peer_public_key.write();
if let Some(existing_pubkey) = peer_pubkey.as_ref() {
if existing_pubkey != &conn_pubkey {
return Err(Error::SecretKeyError(format!(
"peer public key mismatch. peer_id: {}, existing_len: {}, new_len: {}",
self.peer_node_id,
existing_pubkey.len(),
conn_pubkey.len()
)));
}
} else {
*peer_pubkey = Some(conn_pubkey);
}
}
conn.start_recv_loop(self.packet_recv_chan.clone()).await;
conn.start_pingpong();
@@ -226,10 +248,14 @@ impl Peer {
ret
}
pub fn has_live_conns(&self) -> bool {
self.conns.iter().any(|entry| !entry.value().is_closed())
}
pub fn has_directly_connected_conn(&self) -> bool {
self.conns
.iter()
.any(|entry| !(entry.value()).is_hole_punched())
.any(|entry| !entry.value().is_closed() && !entry.value().is_hole_punched())
}
pub fn get_directly_connections(&self) -> DashSet<uuid::Uuid> {
@@ -247,6 +273,10 @@ impl Peer {
pub fn get_peer_identity_type(&self) -> Option<PeerIdentityType> {
self.peer_identity_type.load()
}
pub fn get_peer_public_key(&self) -> Option<Vec<u8>> {
self.peer_public_key.read().clone()
}
}
// pritn on drop
@@ -458,4 +488,82 @@ mod tests {
let ret = peer.add_peer_conn(admin_client_conn).await;
assert!(ret.is_err());
}
#[tokio::test]
async fn reject_peer_conn_with_mismatched_public_key() {
let (packet_send, _packet_recv) = create_packet_recv_chan();
let local_peer_id = new_peer_id();
let remote_peer_id = new_peer_id();
let peer = Peer::new(remote_peer_id, packet_send, get_mock_global_ctx());
let ps = Arc::new(PeerSessionStore::new());
let (client_tunnel_1, server_tunnel_1) = create_ring_tunnel_pair();
let client_ctx_1 = get_mock_global_ctx();
let server_ctx_1 = get_mock_global_ctx();
client_ctx_1
.config
.set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string()));
server_ctx_1
.config
.set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string()));
set_secure_mode_cfg(&client_ctx_1, true);
set_secure_mode_cfg(&server_ctx_1, true);
let mut client_conn_1 = PeerConn::new(
local_peer_id,
client_ctx_1,
Box::new(client_tunnel_1),
ps.clone(),
);
let mut server_conn_1 = PeerConn::new(
remote_peer_id,
server_ctx_1,
Box::new(server_tunnel_1),
ps.clone(),
);
let (c1, s1) = tokio::join!(
client_conn_1.do_handshake_as_client(),
server_conn_1.do_handshake_as_server()
);
c1.unwrap();
s1.unwrap();
let (client_tunnel_2, server_tunnel_2) = create_ring_tunnel_pair();
let client_ctx_2 = get_mock_global_ctx();
let server_ctx_2 = get_mock_global_ctx();
client_ctx_2
.config
.set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string()));
server_ctx_2
.config
.set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string()));
set_secure_mode_cfg(&client_ctx_2, true);
set_secure_mode_cfg(&server_ctx_2, true);
let mut client_conn_2 = PeerConn::new(
local_peer_id,
client_ctx_2,
Box::new(client_tunnel_2),
Arc::new(PeerSessionStore::new()),
);
let mut server_conn_2 = PeerConn::new(
remote_peer_id,
server_ctx_2,
Box::new(server_tunnel_2),
Arc::new(PeerSessionStore::new()),
);
let (c2, s2) = tokio::join!(
client_conn_2.do_handshake_as_client(),
server_conn_2.do_handshake_as_server()
);
c2.unwrap();
s2.unwrap();
let pubkey_1 = client_conn_1.get_conn_info().noise_remote_static_pubkey;
let pubkey_2 = client_conn_2.get_conn_info().noise_remote_static_pubkey;
assert_ne!(pubkey_1, pubkey_2);
peer.add_peer_conn(client_conn_1).await.unwrap();
assert_eq!(peer.get_peer_public_key(), Some(pubkey_1));
let ret = peer.add_peer_conn(client_conn_2).await;
assert!(ret.is_err());
}
}
+26 -13
View File
@@ -455,6 +455,10 @@ impl PeerConn {
self.is_hole_punched
}
pub fn is_closed(&self) -> bool {
self.close_event_notifier.is_closed()
}
async fn wait_handshake(&self, need_retry: &mut bool) -> Result<HandshakeRequest, Error> {
*need_retry = false;
@@ -687,9 +691,9 @@ impl PeerConn {
/// | Admin | Admin | same network_secret, proof verified | NetworkSecretConfirmed | NetworkSecretConfirmed | Admin | Admin |
/// | Credential | Admin | client pubkey is trusted by admin | EncryptedUnauthenticated | PeerVerified | Admin | Credential |
/// | Credential | Admin | client pubkey is unknown | handshake may fail | handshake reject | unknown | unknown |
/// | Admin | SharedNode | pinned key match | PeerVerified | EncryptedUnauthenticated | SharedNode | SharedNode |
/// | Admin | SharedNode | local has no pinned key requirement | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode |
/// | Credential | SharedNode | no pin and not trusted | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | SharedNode |
/// | Admin | SharedNode | pinned key match | PeerVerified | EncryptedUnauthenticated | SharedNode | Admin |
/// | Admin | SharedNode | local has no pinned key requirement | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | Admin |
/// | Credential | SharedNode | no pin and not trusted | EncryptedUnauthenticated | EncryptedUnauthenticated | SharedNode | Credential |
/// | Credential | Credential | should reject | handshake reject | handshake reject | unknown | unknown |
///
/// Logic (in priority order):
@@ -764,20 +768,27 @@ impl PeerConn {
secure_auth_level: SecureAuthLevel,
remote_role_hint_is_same_network: bool,
remote_sent_secret_proof: bool,
is_client: bool,
) -> PeerIdentityType {
if !remote_role_hint_is_same_network
|| remote_network_name != self.global_ctx.get_network_name()
{
return PeerIdentityType::SharedNode;
}
if is_client {
PeerIdentityType::SharedNode
} else if remote_sent_secret_proof {
PeerIdentityType::Admin
} else {
PeerIdentityType::Credential
}
} else {
if matches!(secure_auth_level, SecureAuthLevel::NetworkSecretConfirmed)
|| remote_sent_secret_proof
{
return PeerIdentityType::Admin;
}
if matches!(secure_auth_level, SecureAuthLevel::NetworkSecretConfirmed)
|| remote_sent_secret_proof
{
return PeerIdentityType::Admin;
PeerIdentityType::Credential
}
PeerIdentityType::Credential
}
async fn do_noise_handshake_as_client(&self) -> Result<NoiseHandshakeResult, Error> {
@@ -920,6 +931,7 @@ impl PeerConn {
secure_auth_level,
msg2_pb.role_hint == 1,
remote_sent_secret_proof,
true,
);
let handshake_hash = hs.get_handshake_hash().to_vec();
@@ -1174,6 +1186,7 @@ impl PeerConn {
secure_auth_level,
role_hint == 1,
msg3_pb.secret_proof_32.is_some(),
false,
);
let handshake_hash = hs.get_handshake_hash().to_vec();
@@ -1948,7 +1961,7 @@ pub mod tests {
);
assert_eq!(
s_peer.get_conn_info().peer_identity_type,
PeerIdentityType::SharedNode as i32,
PeerIdentityType::Admin as i32,
);
}
@@ -1999,7 +2012,7 @@ pub mod tests {
);
assert_eq!(
s_peer.get_conn_info().peer_identity_type,
PeerIdentityType::SharedNode as i32,
PeerIdentityType::Admin as i32,
);
}
+18 -5
View File
@@ -819,17 +819,15 @@ impl PeerManager {
tracing::error!(?e, "decrypt failed");
continue;
}
} else if !peers.has_peer(from_peer_id)
&& !foreign_client.has_next_hop(from_peer_id)
{
} else if hdr.is_encrypted() {
match relay_peer_map.decrypt_if_needed(&mut ret).await {
Ok(true) => {}
Ok(false) => {
tracing::error!("relay session not found");
tracing::error!("secure session not found");
continue;
}
Err(e) => {
tracing::error!(?e, "relay decrypt failed");
tracing::error!(?e, "secure decrypt failed");
continue;
}
}
@@ -904,6 +902,16 @@ impl PeerManager {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
let hdr = packet.peer_manager_header().unwrap();
if hdr.packet_type == PacketType::Data as u8 && !hdr.is_not_send_to_tun() {
if hdr.is_encrypted() || hdr.is_compressed() {
tracing::warn!(
from_peer_id = hdr.from_peer_id.get(),
to_peer_id = hdr.to_peer_id.get(),
encrypted = hdr.is_encrypted(),
compressed = hdr.is_compressed(),
"dropping packet before nic because it is not fully decoded"
);
return None;
}
tracing::trace!(?packet, "send packet to nic channel");
// TODO: use a function to get the body ref directly for zero copy
let _ = self.nic_channel.send(packet).await;
@@ -989,6 +997,11 @@ impl PeerManager {
}
}
async fn get_peer_public_key(&self, peer_id: PeerId) -> Option<Vec<u8>> {
let peer_map = self.peers.upgrade()?;
peer_map.get_peer_public_key(peer_id)
}
async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option<PeerIdentityType> {
let peer_map = self.peers.upgrade()?;
peer_map.get_peer_identity_type(peer_id)
+8 -7
View File
@@ -278,13 +278,9 @@ impl PeerMap {
pub async fn list_peers_with_conn(&self) -> Vec<PeerId> {
let mut ret = Vec::new();
let peers = self.list_peers();
for peer_id in peers.iter() {
let Some(peer) = self.get_peer_by_id(*peer_id) else {
continue;
};
if !peer.list_peer_conns().await.is_empty() {
ret.push(*peer_id);
for item in self.peer_map.iter() {
if item.value().has_live_conns() {
ret.push(*item.key());
}
}
ret
@@ -308,6 +304,11 @@ impl PeerMap {
.and_then(|p| p.get_peer_identity_type())
}
pub fn get_peer_public_key(&self, peer_id: PeerId) -> Option<Vec<u8>> {
self.get_peer_by_id(peer_id)
.and_then(|p| p.get_peer_public_key())
}
pub async fn close_peer_conn(
&self,
peer_id: PeerId,
+535 -189
View File
@@ -32,8 +32,12 @@ use tokio::{
use crate::{
common::{
config::NetworkIdentity, constants::EASYTIER_VERSION, global_ctx::ArcGlobalCtx,
shrink_dashmap, stun::StunInfoCollectorTrait, PeerId,
config::NetworkIdentity,
constants::EASYTIER_VERSION,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
shrink_dashmap,
stun::StunInfoCollectorTrait,
PeerId,
},
peers::route_trait::{Route, RouteInterfaceBox},
proto::{
@@ -66,6 +70,8 @@ use super::{
PeerPacketFilter,
};
use atomic_shim::AtomicU64;
static SERVICE_ID: u32 = 7;
static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600);
static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
@@ -371,6 +377,13 @@ impl Default for RouteConnInfo {
}
}
#[derive(Debug, Default)]
struct InterfacePeerSnapshot {
generation: u64,
peers: BTreeSet<PeerId>,
identity_types: BTreeMap<PeerId, Option<PeerIdentityType>>,
}
// constructed with all infos synced from all peers.
struct SyncedRouteInfo {
peer_infos: RwLock<OrderedHashMap<PeerId, RoutePeerInfo>>,
@@ -1049,14 +1062,12 @@ impl SyncedRouteInfo {
.unwrap_or(false)
}
fn get_credential_info(&self, peer_id: PeerId) -> Option<TrustedCredentialPubkey> {
let peer_infos = self.peer_infos.read();
let info = peer_infos.get(&peer_id)?;
if info.noise_static_pubkey.is_empty() {
fn get_credential_info_by_pubkey(&self, peer_pubkey: &[u8]) -> Option<TrustedCredentialPubkey> {
if peer_pubkey.is_empty() {
return None;
}
self.trusted_credential_pubkeys
.get(&info.noise_static_pubkey)
.get(peer_pubkey)
.map(|r| r.value().clone())
}
}
@@ -1800,6 +1811,9 @@ struct PeerRouteServiceImpl {
synced_route_info: SyncedRouteInfo,
cached_local_conn_map: std::sync::Mutex<RouteConnBitmap>,
cached_local_conn_map_version: AtomicVersion,
cached_interface_peer_snapshot: std::sync::Mutex<Arc<InterfacePeerSnapshot>>,
interface_peers_generation: AtomicU64,
applied_interface_peers_generation: AtomicU64,
last_update_my_foreign_network: AtomicCell<Option<std::time::Instant>>,
@@ -1858,6 +1872,11 @@ impl PeerRouteServiceImpl {
},
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()),
cached_local_conn_map_version: AtomicVersion::new(),
cached_interface_peer_snapshot: std::sync::Mutex::new(Arc::new(
InterfacePeerSnapshot::default(),
)),
interface_peers_generation: AtomicU64::new(1),
applied_interface_peers_generation: AtomicU64::new(0),
last_update_my_foreign_network: AtomicCell::new(None),
@@ -1904,15 +1923,60 @@ impl PeerRouteServiceImpl {
self.sessions.iter().map(|x| *x.key()).collect()
}
pub fn mark_interface_peers_dirty(&self) {
self.interface_peers_generation
.fetch_add(1, Ordering::Relaxed);
}
async fn interface_peer_snapshot_uncached(&self) -> InterfacePeerSnapshot {
let interface = self.interface.lock().await;
let interface = interface.as_ref().unwrap();
let peers: BTreeSet<_> = interface.list_peers().await.into_iter().collect();
let mut identity_types = BTreeMap::new();
for peer_id in peers.iter().copied() {
identity_types.insert(peer_id, interface.get_peer_identity_type(peer_id).await);
}
InterfacePeerSnapshot {
generation: 0,
peers,
identity_types,
}
}
async fn interface_peer_snapshot(&self) -> Arc<InterfacePeerSnapshot> {
loop {
let start_generation = self.interface_peers_generation.load(Ordering::Acquire);
{
let cached = self.cached_interface_peer_snapshot.lock().unwrap();
if cached.generation == start_generation {
return cached.clone();
}
}
let mut snapshot = self.interface_peer_snapshot_uncached().await;
let end_generation = self.interface_peers_generation.load(Ordering::Acquire);
if start_generation == end_generation {
snapshot.generation = end_generation;
let snapshot = Arc::new(snapshot);
*self.cached_interface_peer_snapshot.lock().unwrap() = snapshot.clone();
return snapshot;
}
}
}
async fn list_peers_from_interface_snapshot(&self) -> (u64, BTreeSet<PeerId>) {
let snapshot = self.interface_peer_snapshot().await;
(snapshot.generation, snapshot.peers.clone())
}
async fn list_peers_from_interface<T: FromIterator<PeerId>>(&self) -> T {
self.interface
.lock()
self.interface_peer_snapshot()
.await
.as_ref()
.unwrap()
.list_peers()
.await
.into_iter()
.peers
.iter()
.copied()
.collect()
}
@@ -1920,6 +1984,11 @@ impl PeerRouteServiceImpl {
&self,
peer_id: PeerId,
) -> Option<PeerIdentityType> {
let snapshot = self.interface_peer_snapshot().await;
if let Some(identity_type) = snapshot.identity_types.get(&peer_id) {
return *identity_type;
}
self.interface
.lock()
.await
@@ -1929,6 +1998,16 @@ impl PeerRouteServiceImpl {
.await
}
async fn get_peer_public_key_from_interface(&self, peer_id: PeerId) -> Option<Vec<u8>> {
self.interface
.lock()
.await
.as_ref()
.unwrap()
.get_peer_public_key(peer_id)
.await
}
fn update_my_peer_info(&self) -> bool {
self.synced_route_info.update_my_peer_info(
self.my_peer_id,
@@ -1938,9 +2017,33 @@ impl PeerRouteServiceImpl {
}
async fn update_my_conn_info(&self) -> bool {
let connected_peers: BTreeSet<PeerId> = self.list_peers_from_interface().await;
self.synced_route_info
.update_my_conn_info(self.my_peer_id, connected_peers)
let current_generation = self.interface_peers_generation.load(Ordering::Acquire);
let generation_applied = self
.applied_interface_peers_generation
.load(Ordering::Acquire)
== current_generation;
if generation_applied {
let need_periodic_requery = self
.interface
.lock()
.await
.as_ref()
.map(|x| x.need_periodic_requery_peers())
.unwrap_or(false);
if !need_periodic_requery {
return false;
}
self.mark_interface_peers_dirty();
}
let (generation, connected_peers) = self.list_peers_from_interface_snapshot().await;
let updated = self
.synced_route_info
.update_my_conn_info(self.my_peer_id, connected_peers);
self.applied_interface_peers_generation
.store(generation, Ordering::Release);
updated
}
async fn update_my_foreign_network(&self) -> bool {
@@ -2048,6 +2151,18 @@ impl PeerRouteServiceImpl {
.unwrap_or(false)
}
fn handle_global_ctx_event(&self, event: &GlobalCtxEvent) {
if matches!(
event,
GlobalCtxEvent::PeerAdded(_)
| GlobalCtxEvent::PeerRemoved(_)
| GlobalCtxEvent::PeerConnAdded(_)
| GlobalCtxEvent::PeerConnRemoved(_)
) {
self.mark_interface_peers_dirty();
}
}
fn update_route_table_and_cached_local_conn_bitmap(&self) {
self.update_peer_info_last_update();
@@ -2283,15 +2398,7 @@ impl PeerRouteServiceImpl {
let my_foreign_network_updated = self.update_my_foreign_network().await;
let mut untrusted_changed = false;
if my_peer_info_updated {
let network_identity = self.global_ctx.get_network_identity();
let network_secret = network_identity.network_secret.as_deref();
let (untrusted, global_trusted_keys) = self
.synced_route_info
.verify_and_update_credential_trusts(network_secret);
self.global_ctx
.update_trusted_keys(global_trusted_keys, &network_identity.network_name);
self.disconnect_untrusted_peers(&untrusted).await;
untrusted_changed = !untrusted.is_empty();
untrusted_changed = self.refresh_credential_trusts_and_disconnect().await;
}
if my_peer_info_updated || my_conn_info_updated || untrusted_changed {
@@ -2304,6 +2411,18 @@ impl PeerRouteServiceImpl {
my_peer_info_updated || my_conn_info_updated || my_foreign_network_updated
}
async fn refresh_credential_trusts_and_disconnect(&self) -> bool {
let network_identity = self.global_ctx.get_network_identity();
let network_secret = network_identity.network_secret.as_deref();
let (untrusted, global_trusted_keys) = self
.synced_route_info
.verify_and_update_credential_trusts(network_secret);
self.global_ctx
.update_trusted_keys(global_trusted_keys, &network_identity.network_name);
self.disconnect_untrusted_peers(&untrusted).await;
!untrusted.is_empty()
}
async fn disconnect_untrusted_peers(&self, untrusted_peers: &[PeerId]) {
if untrusted_peers.is_empty() {
return;
@@ -2364,7 +2483,7 @@ impl PeerRouteServiceImpl {
}
}
fn clear_expired_peer(&self) {
async fn clear_expired_peer(&self) {
let now = SystemTime::now();
let mut to_remove = Vec::new();
for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() {
@@ -2404,6 +2523,7 @@ impl PeerRouteServiceImpl {
self.synced_route_info.foreign_network.remove(p);
}
self.refresh_credential_trusts_and_disconnect().await;
self.route_table.clean_expired_route_info();
self.route_table_with_cost.clean_expired_route_info();
}
@@ -2789,10 +2909,9 @@ impl RouteSessionManager {
_ = recv.recv() => {}
}
let mut peers = service_impl.list_peers_from_interface::<Vec<_>>().await;
peers.sort();
let session_peers = self.list_session_peers();
let interface_snapshot = service_impl.interface_peer_snapshot().await;
let peers = &interface_snapshot.peers;
let session_peers = self.list_session_peer_set();
for peer_id in session_peers.iter() {
if !peers.contains(peer_id) {
if Some(*peer_id) == cur_dst_peer_id_to_initiate {
@@ -2808,9 +2927,11 @@ impl RouteSessionManager {
// Step 9a: Filter OSPF session candidates based on direct auth level.
// - Credential nodes only initiate sessions to admin nodes (not other credential nodes)
// - Admin nodes don't initiate sessions to credential nodes
let identity_type = service_impl
.get_peer_identity_type_from_interface(peer_id)
.await
let identity_type = interface_snapshot
.identity_types
.get(&peer_id)
.copied()
.flatten()
.unwrap_or(PeerIdentityType::Admin);
if matches!(identity_type, PeerIdentityType::Credential) {
continue;
@@ -2896,6 +3017,14 @@ impl RouteSessionManager {
service_impl.list_session_peers()
}
fn list_session_peer_set(&self) -> BTreeSet<PeerId> {
let Some(service_impl) = self.service_impl.upgrade() else {
return BTreeSet::new();
};
service_impl.list_session_peers().into_iter().collect()
}
fn dump_sessions(&self) -> Result<String, Error> {
let Some(service_impl) = self.service_impl.upgrade() else {
return Err(Error::Stopped);
@@ -2919,6 +3048,33 @@ impl RouteSessionManager {
tracing::debug!(?ret, ?reason, "sync_now_broadcast.send");
}
fn extract_credential_peer_info(
&self,
from_peer_id: PeerId,
peer_infos: &[RoutePeerInfo],
raw_peer_infos: &[DynamicMessage],
credential: &TrustedCredentialPubkey,
) -> Option<(RoutePeerInfo, DynamicMessage)> {
let info_idx = peer_infos.iter().position(|p| p.peer_id == from_peer_id)?;
let mut info = peer_infos[info_idx].clone();
let mut raw_info = raw_peer_infos[info_idx].clone();
let allowed_cidrs = &credential.allowed_proxy_cidrs;
// Filter proxy_cidrs to only those allowed by credential
if !allowed_cidrs.is_empty() {
info.proxy_cidrs.retain(|cidr| {
allowed_cidrs
.iter()
.any(|allowed| cidr_is_subset_str(cidr, allowed))
});
} else {
// No allowed_proxy_cidrs → no proxy_cidrs allowed
info.proxy_cidrs.clear();
}
SyncedRouteInfo::mark_credential_peer(&mut info, true);
patch_raw_from_info(&mut raw_info, &info, &["proxy_cidrs", "feature_flag"]);
Some((info, raw_info))
}
#[allow(clippy::too_many_arguments)]
async fn do_sync_route_info(
&self,
@@ -2942,7 +3098,22 @@ impl RouteSessionManager {
.await
.unwrap_or(PeerIdentityType::Admin);
let from_is_credential = matches!(from_identity_type, PeerIdentityType::Credential);
let from_is_shared = matches!(from_identity_type, PeerIdentityType::SharedNode);
let credential_info = if from_is_credential {
service_impl
.get_peer_public_key_from_interface(from_peer_id)
.await
.and_then(|pubkey| {
service_impl
.synced_route_info
.get_credential_info_by_pubkey(&pubkey)
})
} else {
None
};
if from_is_credential && credential_info.is_none() {
// no credential found
return Err(Error::Stopped);
}
let _session_lock = session.lock.lock();
@@ -2955,108 +3126,44 @@ impl RouteSessionManager {
if let Some(peer_infos) = &peer_infos {
// Step 9b: credential peers can only propagate their own route info
let normalize_raw = |info: &RoutePeerInfo| {
let mut raw = DynamicMessage::new(RoutePeerInfo::default().descriptor());
raw.transcode_from(info).unwrap();
raw
};
let normalized_peer_infos: Vec<RoutePeerInfo>;
let normalized_raw_peer_infos: Vec<DynamicMessage>;
// patch_raw_from_info(&mut raw, info, &["proxy_cidrs", "feature_flag"]);
let (pi, rpi) = if from_is_credential {
let allowed_cidrs = service_impl
.synced_route_info
.get_credential_info(from_peer_id)
.map(|tc| tc.allowed_proxy_cidrs.clone())
.unwrap_or_default();
normalized_peer_infos = peer_infos
.iter()
.filter(|info| info.peer_id == from_peer_id)
.cloned()
.map(|mut info| {
// Filter proxy_cidrs to only those allowed by credential
if !allowed_cidrs.is_empty() {
info.proxy_cidrs.retain(|cidr| {
allowed_cidrs
.iter()
.any(|allowed| cidr_is_subset_str(cidr, allowed))
});
} else {
// No allowed_proxy_cidrs → no proxy_cidrs allowed
info.proxy_cidrs.clear();
}
SyncedRouteInfo::mark_credential_peer(&mut info, true);
info
})
.collect();
normalized_raw_peer_infos = normalized_peer_infos
.iter()
.map(|info| {
// Find original raw for this peer to preserve unknown fields
let orig_idx = peer_infos.iter().position(|p| p.peer_id == info.peer_id);
let mut raw = orig_idx
.and_then(|idx| raw_peer_infos.as_ref().map(|rpi| rpi[idx].clone()))
.unwrap_or_else(|| normalize_raw(info));
patch_raw_from_info(&mut raw, info, &["proxy_cidrs", "feature_flag"]);
raw
})
.collect();
(&normalized_peer_infos, &normalized_raw_peer_infos)
if let Some(ret) = self.extract_credential_peer_info(
from_peer_id,
peer_infos,
raw_peer_infos.as_deref().unwrap(),
credential_info.as_ref().unwrap(),
) {
(&vec![ret.0], &vec![ret.1])
} else {
(&vec![], &vec![])
}
} else {
let mut peer_infos_mut = peer_infos.clone();
let mut raw_peer_infos_mut = raw_peer_infos
.as_ref()
.cloned()
.unwrap_or_else(|| peer_infos_mut.iter().map(normalize_raw).collect());
if from_is_shared {
for (info, raw) in peer_infos_mut.iter_mut().zip(raw_peer_infos_mut.iter_mut())
{
info.trusted_credential_pubkeys.clear();
patch_raw_from_info(raw, info, &["trusted_credential_pubkeys"]);
}
}
if let Some((idx, info)) = peer_infos_mut
.iter()
.enumerate()
.find(|(_, info)| info.peer_id == from_peer_id)
{
let mut info = info.clone();
SyncedRouteInfo::mark_credential_peer(&mut info, false);
peer_infos_mut[idx] = info.clone();
patch_raw_from_info(&mut raw_peer_infos_mut[idx], &info, &["feature_flag"]);
}
normalized_peer_infos = peer_infos_mut;
normalized_raw_peer_infos = raw_peer_infos_mut;
(&normalized_peer_infos, &normalized_raw_peer_infos)
(peer_infos, raw_peer_infos.as_ref().unwrap())
};
service_impl.synced_route_info.update_peer_infos(
my_peer_id,
service_impl.my_peer_route_id,
from_peer_id,
pi,
rpi,
)?;
service_impl
.synced_route_info
.verify_and_update_group_trusts(
if !pi.is_empty() {
service_impl.synced_route_info.update_peer_infos(
my_peer_id,
service_impl.my_peer_route_id,
from_peer_id,
pi,
&service_impl.global_ctx.get_acl_group_declarations(),
);
session.update_dst_saved_peer_info_version(pi, from_peer_id);
need_update_route_table = true;
rpi,
)?;
service_impl
.synced_route_info
.verify_and_update_group_trusts(
pi,
&service_impl.global_ctx.get_acl_group_declarations(),
);
session.update_dst_saved_peer_info_version(pi, from_peer_id);
need_update_route_table = true;
}
}
// Step 9b: credential peers' conn_info depends on allow_relay flag
if let Some(conn_info) = &conn_info {
let accept_conn_info = if from_is_credential {
service_impl
.synced_route_info
.get_credential_info(from_peer_id)
.map(|tc| tc.allow_relay)
.unwrap_or(false)
} else {
true
};
let accept_conn_info =
!from_is_credential || credential_info.map(|tc| tc.allow_relay).unwrap_or(false);
if accept_conn_info {
service_impl.synced_route_info.update_conn_info(conn_info);
session.update_dst_saved_conn_info_version(conn_info, from_peer_id);
@@ -3162,7 +3269,7 @@ impl PeerRoute {
async fn clear_expired_peer(service_impl: Arc<PeerRouteServiceImpl>) {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
service_impl.clear_expired_peer();
service_impl.clear_expired_peer().await;
// TODO: use debug log level for this.
tracing::debug!(?service_impl, "clear_expired_peer");
}
@@ -3180,6 +3287,7 @@ impl PeerRoute {
session_mgr: RouteSessionManager,
) {
let mut global_event_receiver = service_impl.global_ctx.subscribe();
service_impl.mark_interface_peers_dirty();
loop {
if service_impl.update_my_infos().await {
session_mgr.sync_now("update_my_infos");
@@ -3193,6 +3301,12 @@ impl PeerRoute {
select! {
ev = global_event_receiver.recv() => {
if let Ok(ev_ref) = &ev {
service_impl.handle_global_ctx_event(ev_ref);
} else {
service_impl.mark_interface_peers_dirty();
global_event_receiver = global_event_receiver.resubscribe();
}
tracing::info!(?ev, "global event received in update_my_peer_info_routine");
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
@@ -3448,18 +3562,25 @@ impl PeerPacketFilter for Arc<PeerRoute> {}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeSet,
sync::{atomic::Ordering, Arc},
time::Duration,
collections::{BTreeSet, HashMap},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use cidr::{Ipv4Cidr, Ipv4Inet, Ipv6Inet};
use dashmap::DashMap;
use parking_lot::Mutex;
use prefix_trie::PrefixMap;
use prost_reflect::{DynamicMessage, ReflectMessage};
use crate::{
common::{global_ctx::tests::get_mock_global_ctx, PeerId},
common::{
global_ctx::{tests::get_mock_global_ctx, GlobalCtxEvent, TrustedKeySource},
PeerId,
},
connector::udp_hole_punch::tests::replace_stun_info_collector,
peers::{
create_packet_recv_chan,
@@ -3479,11 +3600,12 @@ mod tests {
};
use prost::Message;
use super::PeerRoute;
use super::{PeerRoute, REMOVE_DEAD_PEER_INFO_AFTER};
struct AuthOnlyInterface {
my_peer_id: PeerId,
identity_type: DashMap<PeerId, PeerIdentityType>,
peer_public_key: DashMap<PeerId, Vec<u8>>,
}
#[async_trait::async_trait]
@@ -3496,11 +3618,183 @@ mod tests {
self.my_peer_id
}
async fn get_peer_public_key(&self, peer_id: PeerId) -> Option<Vec<u8>> {
self.peer_public_key
.get(&peer_id)
.map(|x| x.value().clone())
}
async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option<PeerIdentityType> {
self.identity_type.get(&peer_id).map(|x| *x.value())
}
}
struct TrackingInterface {
my_peer_id: PeerId,
closed_peers: Arc<Mutex<Vec<PeerId>>>,
}
#[async_trait::async_trait]
impl RouteInterface for TrackingInterface {
async fn list_peers(&self) -> Vec<PeerId> {
Vec::new()
}
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
async fn close_peer(&self, peer_id: PeerId) {
self.closed_peers.lock().push(peer_id);
}
}
struct CountingInterface {
my_peer_id: PeerId,
peers: Arc<Mutex<Vec<PeerId>>>,
peer_identity_types: Arc<Mutex<HashMap<PeerId, Option<PeerIdentityType>>>>,
list_peers_calls: Arc<AtomicU32>,
get_peer_identity_type_calls: Arc<AtomicU32>,
}
#[async_trait::async_trait]
impl RouteInterface for CountingInterface {
async fn list_peers(&self) -> Vec<PeerId> {
self.list_peers_calls.fetch_add(1, Ordering::Relaxed);
self.peers.lock().clone()
}
async fn get_peer_identity_type(&self, peer_id: PeerId) -> Option<PeerIdentityType> {
self.get_peer_identity_type_calls
.fetch_add(1, Ordering::Relaxed);
self.peer_identity_types
.lock()
.get(&peer_id)
.copied()
.flatten()
}
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
}
#[tokio::test]
async fn interface_peer_cache_refreshes_only_when_marked_dirty() {
let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx());
let peers = Arc::new(Mutex::new(vec![2, 3]));
let peer_identity_types = Arc::new(Mutex::new(HashMap::new()));
let list_peers_calls = Arc::new(AtomicU32::new(0));
let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0));
*service_impl.interface.lock().await = Some(Box::new(CountingInterface {
my_peer_id: 1,
peers: peers.clone(),
peer_identity_types,
list_peers_calls: list_peers_calls.clone(),
get_peer_identity_type_calls,
}));
let first: BTreeSet<_> = service_impl.list_peers_from_interface().await;
let second: BTreeSet<_> = service_impl.list_peers_from_interface().await;
assert_eq!(first, BTreeSet::from([2, 3]));
assert_eq!(second, BTreeSet::from([2, 3]));
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1);
*peers.lock() = vec![2, 4];
service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnAdded(Default::default()));
let third: BTreeSet<_> = service_impl.list_peers_from_interface().await;
assert_eq!(third, BTreeSet::from([2, 4]));
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn update_my_conn_info_skips_interface_scan_when_topology_is_unchanged() {
let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx());
let peers = Arc::new(Mutex::new(vec![2, 3]));
let peer_identity_types = Arc::new(Mutex::new(HashMap::new()));
let list_peers_calls = Arc::new(AtomicU32::new(0));
let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0));
*service_impl.interface.lock().await = Some(Box::new(CountingInterface {
my_peer_id: 1,
peers: peers.clone(),
peer_identity_types,
list_peers_calls: list_peers_calls.clone(),
get_peer_identity_type_calls: get_peer_identity_type_calls.clone(),
}));
assert!(service_impl.update_my_conn_info().await);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2);
assert!(!service_impl.update_my_conn_info().await);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2);
*peers.lock() = vec![2, 4];
service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnRemoved(Default::default()));
assert!(service_impl.update_my_conn_info().await);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4);
assert!(!service_impl.update_my_conn_info().await);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4);
}
#[tokio::test]
async fn get_peer_identity_type_reuses_snapshot_until_topology_changes() {
let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx());
let peers = Arc::new(Mutex::new(vec![2, 3]));
let peer_identity_types = Arc::new(Mutex::new(HashMap::from([
(2, Some(PeerIdentityType::Credential)),
(3, Some(PeerIdentityType::Admin)),
(4, Some(PeerIdentityType::Admin)),
])));
let list_peers_calls = Arc::new(AtomicU32::new(0));
let get_peer_identity_type_calls = Arc::new(AtomicU32::new(0));
*service_impl.interface.lock().await = Some(Box::new(CountingInterface {
my_peer_id: 1,
peers: peers.clone(),
peer_identity_types: peer_identity_types.clone(),
list_peers_calls: list_peers_calls.clone(),
get_peer_identity_type_calls: get_peer_identity_type_calls.clone(),
}));
assert_eq!(
service_impl.get_peer_identity_type_from_interface(2).await,
Some(PeerIdentityType::Credential)
);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2);
assert_eq!(
service_impl.get_peer_identity_type_from_interface(2).await,
Some(PeerIdentityType::Credential)
);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 1);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 2);
*peers.lock() = vec![2, 4];
service_impl.handle_global_ctx_event(&GlobalCtxEvent::PeerConnRemoved(Default::default()));
assert_eq!(
service_impl.get_peer_identity_type_from_interface(4).await,
Some(PeerIdentityType::Admin)
);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4);
assert_eq!(
service_impl.get_peer_identity_type_from_interface(4).await,
Some(PeerIdentityType::Admin)
);
assert_eq!(list_peers_calls.load(Ordering::Relaxed), 2);
assert_eq!(get_peer_identity_type_calls.load(Ordering::Relaxed), 4);
}
async fn create_mock_route(peer_mgr: Arc<PeerManager>) -> Arc<PeerRoute> {
let peer_route = PeerRoute::new(
peer_mgr.my_peer_id(),
@@ -3654,13 +3948,29 @@ mod tests {
let route = create_mock_route(peer_mgr.clone()).await;
let from_peer_id: PeerId = 10001;
let forwarded_peer_id: PeerId = 10002;
let credential_pubkey = vec![3u8; 32];
let identity_type = DashMap::new();
identity_type.insert(from_peer_id, PeerIdentityType::Credential);
let peer_public_key = DashMap::new();
peer_public_key.insert(from_peer_id, credential_pubkey.clone());
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
peer_public_key,
}));
route
.service_impl
.synced_route_info
.trusted_credential_pubkeys
.insert(
credential_pubkey.clone(),
TrustedCredentialPubkey {
pubkey: credential_pubkey,
expiry_unix: i64::MAX,
..Default::default()
},
);
let mut sender_info = RoutePeerInfo::new();
sender_info.peer_id = from_peer_id;
@@ -3703,6 +4013,7 @@ mod tests {
assert!(guard.get(&forwarded_peer_id).is_none());
}
// shared node doesn't have hmac.
#[tokio::test]
async fn sync_route_info_shared_sender_cannot_publish_trusted_credentials() {
let peer_mgr = create_mock_pmgr().await;
@@ -3716,6 +4027,7 @@ mod tests {
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
peer_public_key: DashMap::new(),
}));
let mut sender_info = RoutePeerInfo::new();
@@ -3755,13 +4067,6 @@ mod tests {
.await
.unwrap();
let guard = route.service_impl.synced_route_info.peer_infos.read();
assert!(guard
.get(&forwarded_peer_id)
.map(|x| x.trusted_credential_pubkeys.is_empty())
.unwrap_or(false));
drop(guard);
assert!(!route
.service_impl
.synced_route_info
@@ -3770,60 +4075,83 @@ mod tests {
}
#[tokio::test]
async fn sync_route_info_forces_non_credential_for_legacy_admin_sender() {
let peer_mgr = create_mock_pmgr().await;
let route = create_mock_route(peer_mgr.clone()).await;
let from_peer_id: PeerId = 10011;
let other_peer_id: PeerId = 10012;
async fn clear_expired_peer_recomputes_trust_after_last_admin_disappears() {
let service_impl = PeerRouteServiceImpl::new(1, get_mock_global_ctx());
let admin_peer_id: PeerId = 10051;
let credential_peer_id: PeerId = 10052;
let admin_pubkey = vec![5u8; 32];
let credential_pubkey = vec![6u8; 32];
let network_name = service_impl
.global_ctx
.get_network_identity()
.network_name
.clone();
let now = SystemTime::now();
let closed_peers = Arc::new(Mutex::new(Vec::new()));
let identity_type = DashMap::new();
identity_type.insert(from_peer_id, PeerIdentityType::Admin);
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
*service_impl.interface.lock().await = Some(Box::new(TrackingInterface {
my_peer_id: service_impl.my_peer_id,
closed_peers: closed_peers.clone(),
}));
let mut sender_info = RoutePeerInfo::new();
sender_info.peer_id = from_peer_id;
sender_info.version = 1;
sender_info.feature_flag = Some(PeerFeatureFlag {
is_credential_peer: true,
..Default::default()
});
{
let mut guard = service_impl.synced_route_info.peer_infos.write();
let mut other_info = RoutePeerInfo::new();
other_info.peer_id = other_peer_id;
other_info.version = 1;
let mut admin_info = RoutePeerInfo::new();
admin_info.peer_id = admin_peer_id;
admin_info.version = 1;
admin_info.last_update =
Some((now - REMOVE_DEAD_PEER_INFO_AFTER - Duration::from_secs(1)).into());
admin_info.noise_static_pubkey = admin_pubkey;
admin_info.trusted_credential_pubkeys = vec![TrustedCredentialPubkeyProof {
credential: Some(TrustedCredentialPubkey {
pubkey: credential_pubkey.clone(),
expiry_unix: i64::MAX,
..Default::default()
}),
credential_hmac: vec![1; 32],
}];
let make_raw = |info: &RoutePeerInfo| {
let mut raw = DynamicMessage::new(RoutePeerInfo::default().descriptor());
raw.transcode_from(info).unwrap();
raw
};
let raw_infos = vec![make_raw(&sender_info), make_raw(&other_info)];
let mut credential_info = RoutePeerInfo::new();
credential_info.peer_id = credential_peer_id;
credential_info.version = 1;
credential_info.last_update = Some(now.into());
credential_info.noise_static_pubkey = credential_pubkey.clone();
route
.session_mgr
.do_sync_route_info(
from_peer_id,
1,
true,
Some(vec![sender_info, other_info]),
Some(raw_infos),
None,
None,
)
.await
.unwrap();
guard.insert(admin_peer_id, admin_info);
guard.insert(credential_peer_id, credential_info);
}
let guard = route.service_impl.synced_route_info.peer_infos.read();
let sender = guard.get(&from_peer_id).unwrap();
assert!(!sender
.feature_flag
.as_ref()
.map(|x| x.is_credential_peer)
.unwrap_or(false));
assert!(guard.get(&other_peer_id).is_some());
let (_, global_trusted_keys) = service_impl
.synced_route_info
.verify_and_update_credential_trusts(None);
service_impl
.global_ctx
.update_trusted_keys(global_trusted_keys, &network_name);
assert!(service_impl
.synced_route_info
.trusted_credential_pubkeys
.contains_key(&credential_pubkey));
service_impl.clear_expired_peer().await;
assert!(!service_impl.global_ctx.is_pubkey_trusted_with_source(
&credential_pubkey,
&network_name,
TrustedKeySource::OspfCredential,
));
assert!(closed_peers.lock().contains(&credential_peer_id));
assert!(!service_impl
.synced_route_info
.peer_infos
.read()
.contains_key(&admin_peer_id));
assert!(!service_impl
.synced_route_info
.peer_infos
.read()
.contains_key(&credential_peer_id));
}
#[rstest::rstest]
@@ -4453,13 +4781,29 @@ mod tests {
let peer_mgr = create_mock_pmgr().await;
let route = create_mock_route(peer_mgr.clone()).await;
let from_peer_id: PeerId = 20001;
let credential_pubkey = vec![4u8; 32];
let identity_type = DashMap::new();
identity_type.insert(from_peer_id, PeerIdentityType::Credential);
let peer_public_key = DashMap::new();
peer_public_key.insert(from_peer_id, credential_pubkey.clone());
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
peer_public_key,
}));
route
.service_impl
.synced_route_info
.trusted_credential_pubkeys
.insert(
credential_pubkey.clone(),
TrustedCredentialPubkey {
pubkey: credential_pubkey,
expiry_unix: i64::MAX,
..Default::default()
},
);
let mut sender_info = RoutePeerInfo::new();
sender_info.peer_id = from_peer_id;
@@ -4505,6 +4849,7 @@ mod tests {
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
peer_public_key: DashMap::new(),
}));
let mut sender_info = RoutePeerInfo::new();
@@ -4575,6 +4920,7 @@ mod tests {
*route.service_impl.interface.lock().await = Some(Box::new(AuthOnlyInterface {
my_peer_id: peer_mgr.my_peer_id(),
identity_type,
peer_public_key: DashMap::new(),
}));
let mut sender_info = RoutePeerInfo::new();
+203 -23
View File
@@ -212,7 +212,12 @@ impl PeerSessionStore {
.clone();
session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?;
session.check_or_set_peer_static_pubkey(peer_static_pubkey)?;
session.sync_root_key(root_key, b_session_generation, initial_epoch);
session.sync_root_key(
root_key,
b_session_generation,
initial_epoch,
matches!(action, PeerSessionAction::Sync),
);
Ok(session)
}
}
@@ -352,6 +357,33 @@ impl EpochRxSlot {
}
}
#[derive(Debug, Clone, Copy, Default)]
struct SyncRxGrace {
slots: [[EpochRxSlot; 2]; 2],
expires_at_ms: u64,
valid: bool,
}
impl SyncRxGrace {
fn clear(&mut self) {
self.slots = [[EpochRxSlot::default(), EpochRxSlot::default()]; 2];
self.expires_at_ms = 0;
self.valid = false;
}
fn refresh(&mut self, slots: [[EpochRxSlot; 2]; 2], expires_at_ms: u64) {
self.slots = slots;
self.expires_at_ms = expires_at_ms;
self.valid = true;
}
fn maybe_expire(&mut self, now_ms: u64) {
if self.valid && now_ms >= self.expires_at_ms {
self.clear();
}
}
}
pub struct PeerSession {
peer_id: PeerId,
root_key: RwLock<[u8; 32]>,
@@ -365,6 +397,8 @@ pub struct PeerSession {
rx_slots: Mutex<[[EpochRxSlot; 2]; 2]>,
key_cache: Mutex<[[EpochKeySlot; 2]; 2]>,
sync_rx_grace: Mutex<SyncRxGrace>,
sync_rx_grace_expires_at_ms: AtomicU64,
send_cipher_algorithm: String,
recv_cipher_algorithm: String,
@@ -389,6 +423,11 @@ impl std::fmt::Debug for PeerSession {
.field("send_packets_since_epoch", &self.send_packets_since_epoch)
.field("rx_slots", &self.rx_slots)
.field("key_cache", &self.key_cache)
.field("sync_rx_grace", &self.sync_rx_grace)
.field(
"sync_rx_grace_expires_at_ms",
&self.sync_rx_grace_expires_at_ms,
)
.field("send_cipher_algorithm", &self.send_cipher_algorithm)
.field("recv_cipher_algorithm", &self.recv_cipher_algorithm)
.finish()
@@ -405,6 +444,10 @@ impl PeerSession {
/// traffic may want to increase this value; low-latency or tightly
/// resource-constrained deployments may lower it.
const EVICT_IDLE_AFTER_MS: u64 = 30_000;
/// Keep the pre-sync receive windows alive briefly so in-flight packets
/// from the previous epochs are still accepted after a shared session is
/// synced in place by another connection.
const SYNC_RX_GRACE_AFTER_MS: u64 = 5_000;
/// Maximum number of packets to send in a single epoch before forcing
/// a key/epoch rotation.
@@ -458,6 +501,8 @@ impl PeerSession {
send_packets_since_epoch: AtomicU64::new(0),
rx_slots: Mutex::new(rx_slots),
key_cache: Mutex::new(key_cache),
sync_rx_grace: Mutex::new(SyncRxGrace::default()),
sync_rx_grace_expires_at_ms: AtomicU64::new(0),
send_cipher_algorithm,
recv_cipher_algorithm,
invalidated: AtomicBool::new(false),
@@ -540,7 +585,15 @@ impl PeerSession {
Ok(())
}
pub fn sync_root_key(&self, root_key: [u8; 32], session_generation: u32, initial_epoch: u32) {
pub fn sync_root_key(
&self,
root_key: [u8; 32],
session_generation: u32,
initial_epoch: u32,
preserve_rx_grace: bool,
) {
let old_root_key = self.root_key();
let can_preserve_rx_grace = preserve_rx_grace && old_root_key == root_key;
{
let mut g = self.root_key.write().unwrap();
*g = root_key;
@@ -557,6 +610,16 @@ impl PeerSession {
{
let mut rx = self.rx_slots.lock().unwrap();
let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap();
if can_preserve_rx_grace {
let expires_at_ms = now_ms().saturating_add(Self::SYNC_RX_GRACE_AFTER_MS);
sync_rx_grace.refresh(*rx, expires_at_ms);
self.sync_rx_grace_expires_at_ms
.store(expires_at_ms, Ordering::Relaxed);
} else {
sync_rx_grace.clear();
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
}
for dir in 0..2 {
rx[dir][0].clear();
rx[dir][1].clear();
@@ -598,21 +661,17 @@ impl PeerSession {
key
}
fn get_encryptor(&self, epoch: u32, dir: usize, is_send: bool) -> Option<Arc<dyn Encryptor>> {
let generation = self.session_generation();
let rx = self.rx_slots.lock().unwrap();
let send_epoch = self.send_epoch.load(Ordering::Relaxed);
let allowed = epoch == send_epoch
|| rx[dir][0].valid && rx[dir][0].epoch == epoch
|| rx[dir][1].valid && rx[dir][1].epoch == epoch;
if !allowed {
return None;
}
fn get_or_create_encryptor(
&self,
epoch: u32,
dir: usize,
generation: u32,
is_send: bool,
) -> Arc<dyn Encryptor> {
let mut guard = self.key_cache.lock().unwrap();
for slot in guard[dir].iter_mut() {
if slot.valid && slot.epoch == epoch && slot.generation == generation {
return Some(slot.get_encryptor(is_send));
return slot.get_encryptor(is_send);
}
}
@@ -635,7 +694,7 @@ impl PeerSession {
guard[dir][1] = slot;
}
Some(ret)
ret
}
fn maybe_rotate_epoch(&self, now_ms: u64) {
@@ -698,9 +757,38 @@ impl PeerSession {
}
}
fn epoch_in_slots(slots: &[EpochRxSlot; 2], epoch: u32) -> bool {
slots[0].valid && slots[0].epoch == epoch || slots[1].valid && slots[1].epoch == epoch
}
fn sync_rx_grace_active(&self, now_ms: u64) -> bool {
let expires_at_ms = self.sync_rx_grace_expires_at_ms.load(Ordering::Relaxed);
if expires_at_ms == 0 {
return false;
}
if now_ms < expires_at_ms {
return true;
}
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
false
}
fn check_replay(&self, epoch: u32, seq: u64, dir: usize, now_ms: u64) -> bool {
let mut rx = self.rx_slots.lock().unwrap();
Self::evict_old_rx_slots(&mut rx, now_ms);
let mut sync_rx_grace = if self.sync_rx_grace_active(now_ms) {
let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap();
sync_rx_grace.maybe_expire(now_ms);
if sync_rx_grace.valid {
Self::evict_old_rx_slots(&mut sync_rx_grace.slots, now_ms);
Some(sync_rx_grace)
} else {
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
None
}
} else {
None
};
let send_epoch = self.send_epoch.load(Ordering::Relaxed);
{
let mut key_cache = self.key_cache.lock().unwrap();
@@ -712,7 +800,10 @@ impl PeerSession {
let e = key_cache[d][s].epoch;
let allowed = e == send_epoch
|| rx[d][0].valid && rx[d][0].epoch == e
|| rx[d][1].valid && rx[d][1].epoch == e;
|| rx[d][1].valid && rx[d][1].epoch == e
|| sync_rx_grace
.as_ref()
.is_some_and(|g| Self::epoch_in_slots(&g.slots[d], e));
if !allowed {
key_cache[d][s].valid = false;
}
@@ -720,6 +811,18 @@ impl PeerSession {
}
}
if sync_rx_grace
.as_ref()
.is_some_and(|g| Self::epoch_in_slots(&g.slots[dir], epoch))
{
for slot in sync_rx_grace.as_mut().unwrap().slots[dir].iter_mut() {
if slot.valid && slot.epoch == epoch {
slot.last_rx_ms = now_ms;
return slot.window.accept(seq);
}
}
}
if !rx[dir][0].valid {
rx[dir][0] = EpochRxSlot {
epoch,
@@ -777,9 +880,7 @@ impl PeerSession {
}
let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id);
let (epoch, _seq, nonce_bytes) = self.next_nonce(dir);
let encryptor = self
.get_encryptor(epoch, dir, true)
.ok_or_else(|| anyhow!("no key for epoch"))?;
let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), true);
if let Err(e) = encryptor.encrypt_with_nonce(pkt, Some(nonce_bytes.as_slice())) {
tracing::warn!(
peer_id = ?self.peer_id,
@@ -816,9 +917,7 @@ impl PeerSession {
));
}
let encryptor = self
.get_encryptor(epoch, dir, false)
.ok_or_else(|| anyhow!("no key for epoch"))?;
let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), false);
if let Err(e) = encryptor.decrypt(ciphertext_with_tail) {
let count = self.decrypt_fail_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= Self::DECRYPT_FAIL_THRESHOLD {
@@ -974,7 +1073,7 @@ mod tests {
assert!(s.check_replay(0, 1, 0, now));
// Sync with initial_epoch=2 (simulating a Sync action)
s.sync_root_key(root_key, 2, 2);
s.sync_root_key(root_key, 2, 2, true);
// Remote peer is still sending at epoch 0 — should be accepted
// (rx_slots were cleared, so the first packet establishes the epoch)
@@ -983,4 +1082,85 @@ mod tests {
"packets at old epoch should be accepted after sync"
);
}
#[test]
fn sync_root_key_keeps_previous_epochs_during_grace_window() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(root_key, 2, 2, true);
// The first packet after sync may already use the new epoch.
assert!(s.check_replay(2, 0, 0, now + 2));
// Older in-flight packets from pre-sync epochs should still be accepted
// during the grace period, regardless of arrival order.
assert!(s.check_replay(1, 1, 0, now + 3));
assert!(s.check_replay(0, 1, 0, now + 4));
}
#[test]
fn sync_root_key_expires_previous_epochs_after_grace_window() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(root_key, 2, 2, true);
assert!(s.check_replay(2, 0, 0, now + 2));
assert!(
!s.check_replay(0, 1, 0, now + PeerSession::SYNC_RX_GRACE_AFTER_MS + 3),
"old epochs should stop being accepted once the sync grace window expires"
);
}
#[test]
fn sync_root_key_does_not_preserve_previous_epochs_when_root_key_changes() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(PeerSession::new_root_key(), 2, 2, true);
assert!(s.check_replay(2, 0, 0, now + 2));
assert!(
!s.check_replay(1, 1, 0, now + 3),
"old epochs should not be preserved when sync replaces the root key"
);
}
}
+6
View File
@@ -27,7 +27,13 @@ pub type ForeignNetworkRouteInfoMap =
pub trait RouteInterface {
async fn list_peers(&self) -> Vec<PeerId>;
fn my_peer_id(&self) -> PeerId;
fn need_periodic_requery_peers(&self) -> bool {
false
}
async fn close_peer(&self, _peer_id: PeerId) {}
async fn get_peer_public_key(&self, _peer_id: PeerId) -> Option<Vec<u8>> {
None
}
async fn get_peer_identity_type(&self, _peer_id: PeerId) -> Option<PeerIdentityType> {
None
}
+7
View File
@@ -4,6 +4,12 @@ import "common.proto";
package web;
message DeviceOsInfo {
string os_type = 1;
string version = 2;
string distribution = 3;
}
message HeartbeatRequest {
common.UUID machine_id = 1;
common.UUID inst_id = 2;
@@ -14,6 +20,7 @@ message HeartbeatRequest {
string hostname = 6;
repeated common.UUID running_network_instances = 7;
DeviceOsInfo device_os = 8;
}
message HeartbeatResponse {}
+16 -2
View File
@@ -57,6 +57,17 @@ fn get_faketcp_tunnel_type_str(driver_type: &str) -> String {
format!("faketcp_{}", driver_type)
}
async fn create_tun_off_runtime(
interface_name: String,
src_addr: Option<SocketAddr>,
dst_addr: SocketAddr,
) -> Result<Arc<dyn stack::Tun>, TunnelError> {
tokio::task::spawn_blocking(move || create_tun(&interface_name, src_addr, dst_addr))
.await
.map_err(|e| TunnelError::InternalError(format!("faketcp create_tun task failed: {e}")))?
.map_err(Into::into)
}
pub struct FakeTcpTunnelListener {
addr: url::Url,
os_listener: Option<tokio::net::TcpListener>,
@@ -137,7 +148,9 @@ impl FakeTcpTunnelListener {
let ret = match self.stack_map.entry(interface_name.to_string()) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Vacant(entry) => {
let tun = create_tun(interface_name, None, local_socket_addr)?;
let tun =
create_tun_off_runtime(interface_name.to_string(), None, local_socket_addr)
.await?;
tracing::info!(
?local_socket_addr,
"create new stack with interface_name: {:?}",
@@ -314,7 +327,8 @@ impl crate::tunnel::TunnelConnector for FakeTcpTunnelConnector {
IpAddr::V6(ip) => (None, Some(ip)),
};
let tun = create_tun(&interface_name, Some(remote_addr), local_addr)?;
let tun =
create_tun_off_runtime(interface_name.clone(), Some(remote_addr), local_addr).await?;
let local_ip = local_ip.unwrap_or("0.0.0.0".parse().unwrap());
let mut stack = stack::Stack::new(tun, local_ip, local_ip6, mac);
let driver_type = stack.driver_type();
@@ -367,7 +367,7 @@ fn read_packet_socket_stats(fd: i32) -> io::Result<PacketSocketStats> {
}
pub struct LinuxBpfTun {
fd: OwnedFd,
fd: Arc<OwnedFd>,
ifindex: i32,
stop: Arc<AtomicBool>,
worker: Option<std::thread::JoinHandle<()>>,
@@ -395,7 +395,7 @@ impl LinuxBpfTun {
if fd < 0 {
return Err(io::Error::last_os_error());
}
let fd = unsafe { OwnedFd::from_raw_fd(fd) };
let fd = Arc::new(unsafe { OwnedFd::from_raw_fd(fd) });
let mut addr: libc::sockaddr_ll = unsafe { mem::zeroed() };
addr.sll_family = libc::AF_PACKET as u16;
@@ -404,7 +404,7 @@ impl LinuxBpfTun {
let bind_ret = unsafe {
libc::bind(
fd.as_raw_fd(),
fd.as_ref().as_raw_fd(),
&addr as *const _ as *const libc::sockaddr,
mem::size_of::<libc::sockaddr_ll>() as u32,
)
@@ -413,7 +413,7 @@ impl LinuxBpfTun {
return Err(io::Error::last_os_error());
}
let actual_rcvbuf = set_socket_rcvbuf(fd.as_raw_fd(), DEFAULT_RCVBUF_BYTES)?;
let actual_rcvbuf = set_socket_rcvbuf(fd.as_ref().as_raw_fd(), DEFAULT_RCVBUF_BYTES)?;
let filter = build_tcp_filter(src_addr, dst_addr)?;
let mut prog = libc::sock_fprog {
@@ -425,7 +425,7 @@ impl LinuxBpfTun {
};
let opt_ret = unsafe {
libc::setsockopt(
fd.as_raw_fd(),
fd.as_ref().as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_ATTACH_FILTER,
&mut prog as *mut _ as *mut libc::c_void,
@@ -442,7 +442,7 @@ impl LinuxBpfTun {
};
let _ = unsafe {
libc::setsockopt(
fd.as_raw_fd(),
fd.as_ref().as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_RCVTIMEO,
&timeout as *const _ as *const libc::c_void,
@@ -453,10 +453,13 @@ impl LinuxBpfTun {
let stop = Arc::new(AtomicBool::new(false));
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let stop_clone = stop.clone();
let read_fd = fd.as_raw_fd();
let read_fd = fd.as_ref().as_raw_fd();
let fd_guard = fd.clone();
let interface_name_for_worker = interface_name.to_string();
let worker = std::thread::spawn(move || {
// Keep the packet socket alive until the detached worker actually exits.
let _fd_guard = fd_guard;
let mut buf = vec![0u8; 65536];
let mut stats_enabled = true;
let mut total_packets: u64 = 0;
@@ -562,9 +565,11 @@ impl LinuxBpfTun {
impl Drop for LinuxBpfTun {
fn drop(&mut self) {
self.stop.store(true, AtomicOrdering::Relaxed);
let _ = unsafe { libc::shutdown(self.fd.as_raw_fd(), libc::SHUT_RD) };
let _ = unsafe { libc::shutdown(self.fd.as_ref().as_raw_fd(), libc::SHUT_RD) };
if let Some(worker) = self.worker.take() {
let _ = worker.join();
// Dropping the JoinHandle detaches the worker. The worker holds its own Arc<OwnedFd>
// clone, so the packet socket stays valid until recv wakes up and the thread exits.
drop(worker);
}
}
}
@@ -602,7 +607,7 @@ impl stack::Tun for LinuxBpfTun {
let ret = unsafe {
libc::sendto(
self.fd.as_raw_fd(),
self.fd.as_ref().as_raw_fd(),
packet.as_ptr() as *const libc::c_void,
packet.len(),
0,
+11 -2
View File
@@ -1,13 +1,16 @@
use std::sync::Arc;
use crate::{
instance_manager::NetworkInstanceManager, proto::rpc_impl::service_registry::ServiceRegistry,
rpc_service::api::register_api_rpc_service, web_client::WebClientHooks,
instance_manager::NetworkInstanceManager,
proto::{rpc_impl::service_registry::ServiceRegistry, web::DeviceOsInfo},
rpc_service::api::register_api_rpc_service,
web_client::WebClientHooks,
};
pub struct Controller {
token: String,
hostname: String,
device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>,
hooks: Arc<dyn WebClientHooks>,
}
@@ -16,12 +19,14 @@ impl Controller {
pub fn new(
token: String,
hostname: String,
device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>,
hooks: Arc<dyn WebClientHooks>,
) -> Self {
Controller {
token,
hostname,
device_os,
manager,
hooks,
}
@@ -39,6 +44,10 @@ impl Controller {
self.hostname.clone()
}
pub fn device_os(&self) -> DeviceOsInfo {
self.device_os.clone()
}
pub fn register_api_rpc_service(&self, registry: &ServiceRegistry) {
register_api_rpc_service(&self.manager, registry, Some(self.hooks.clone()));
}
+3 -2
View File
@@ -2,8 +2,8 @@ use std::sync::Arc;
use crate::{
common::{
config::TomlConfigLoader, global_ctx::GlobalCtx, log, scoped_task::ScopedTask,
set_default_machine_id, stun::MockStunInfoCollector,
config::TomlConfigLoader, global_ctx::GlobalCtx, log, os_info::collect_device_os_info,
scoped_task::ScopedTask, set_default_machine_id, stun::MockStunInfoCollector,
},
connector::create_connector_by_url,
instance_manager::{DaemonGuard, NetworkInstanceManager},
@@ -62,6 +62,7 @@ impl WebClient {
let controller = Arc::new(controller::Controller::new(
token.to_string(),
hostname.to_string(),
collect_device_os_info(),
manager,
hooks,
));
+2
View File
@@ -69,6 +69,7 @@ impl Session {
let inst_id = uuid::Uuid::new_v4();
let token = controller.upgrade().unwrap().token();
let hostname = controller.upgrade().unwrap().hostname();
let device_os = controller.upgrade().unwrap().device_os();
let ctx_clone = ctx.clone();
let mut tick = interval(std::time::Duration::from_secs(1));
@@ -91,6 +92,7 @@ impl Session {
easytier_version: EASYTIER_VERSION.to_string(),
hostname: hostname.clone(),
report_time: chrono::Local::now().to_rfc3339(),
device_os: Some(device_os.clone()),
running_network_instances: controller
.list_network_instance_ids()