feat: support allocating public IPv6 addresses from a provider

Add a provider/leaser architecture for public IPv6 address allocation
between nodes in the same network:

- A node with `--ipv6-public-addr-provider` advertises a delegable
  public IPv6 prefix (auto-detected from kernel routes or manually
  configured via `--ipv6-public-addr-prefix`).
- Other nodes with `--ipv6-public-addr-auto` request a /128 lease from
  the selected provider via a new RPC service (PublicIpv6AddrRpc).
- Leases have a 30s TTL, renewed every 10s by the client routine.
- The provider allocates addresses deterministically from its prefix
  using instance-UUID-based hashing to prefer stable assignments.
- Routes to peer leases are installed on the TUN device, and each
  client's own /128 is assigned as its IPv6 address.

Also includes netlink IPv6 route table inspection, integration tests,
and event-driven route/address reconciliation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
sijie.sun
2026-04-25 20:25:42 +08:00
parent b20075e3dc
commit 7908f9c146
21 changed files with 2807 additions and 28 deletions
+9
View File
@@ -39,6 +39,15 @@ core_clap:
ipv6: ipv6:
en: "ipv6 address of this vpn node, can be used together with ipv4 for dual-stack operation" en: "ipv6 address of this vpn node, can be used together with ipv4 for dual-stack operation"
zh-CN: "此VPN节点的IPv6地址,可与IPv4一起使用以进行双栈操作" zh-CN: "此VPN节点的IPv6地址,可与IPv4一起使用以进行双栈操作"
ipv6_public_addr_provider:
en: "share this node's public IPv6 subnet with other peers so they can obtain public IPv6 addresses (Linux only)"
zh-CN: "将此节点的公网 IPv6 子网共享给其他节点,使它们也能获得公网 IPv6 地址(仅 Linux 支持)"
ipv6_public_addr_auto:
en: "auto-obtain a public IPv6 address from a peer that shares its IPv6 subnet"
zh-CN: "自动从共享了 IPv6 子网的对等节点获取一个公网 IPv6 地址"
ipv6_public_addr_prefix:
en: "manually specify the public IPv6 subnet to share, instead of auto-detecting from system routes"
zh-CN: "手动指定要共享的公网 IPv6 子网,不自动从系统路由检测"
dhcp: dhcp:
en: "automatically determine and set IP address by Easytier, and the IP address starts from 10.0.0.1 by default. Warning, if there is an IP conflict in the network when using DHCP, the IP will be automatically changed." en: "automatically determine and set IP address by Easytier, and the IP address starts from 10.0.0.1 by default. Warning, if there is an IP conflict in the network when using DHCP, the IP will be automatically changed."
zh-CN: "由Easytier自动确定并设置IP地址,默认从10.0.0.1开始。警告:在使用DHCP时,如果网络中出现IP冲突,IP将自动更改。" zh-CN: "由Easytier自动确定并设置IP地址,默认从10.0.0.1开始。警告:在使用DHCP时,如果网络中出现IP冲突,IP将自动更改。"
+69
View File
@@ -170,6 +170,15 @@ pub trait ConfigLoader: Send + Sync {
fn get_ipv6(&self) -> Option<cidr::Ipv6Inet>; fn get_ipv6(&self) -> Option<cidr::Ipv6Inet>;
fn set_ipv6(&self, addr: Option<cidr::Ipv6Inet>); fn set_ipv6(&self, addr: Option<cidr::Ipv6Inet>);
fn get_ipv6_public_addr_provider(&self) -> bool;
fn set_ipv6_public_addr_provider(&self, enabled: bool);
fn get_ipv6_public_addr_auto(&self) -> bool;
fn set_ipv6_public_addr_auto(&self, enabled: bool);
fn get_ipv6_public_addr_prefix(&self) -> Option<cidr::Ipv6Cidr>;
fn set_ipv6_public_addr_prefix(&self, prefix: Option<cidr::Ipv6Cidr>);
fn get_dhcp(&self) -> bool; fn get_dhcp(&self) -> bool;
fn set_dhcp(&self, dhcp: bool); fn set_dhcp(&self, dhcp: bool);
@@ -519,6 +528,9 @@ struct Config {
instance_id: Option<uuid::Uuid>, instance_id: Option<uuid::Uuid>,
ipv4: Option<String>, ipv4: Option<String>,
ipv6: Option<String>, ipv6: Option<String>,
ipv6_public_addr_provider: Option<bool>,
ipv6_public_addr_auto: Option<bool>,
ipv6_public_addr_prefix: Option<String>,
dhcp: Option<bool>, dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>, network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>, listeners: Option<Vec<url::Url>>,
@@ -700,6 +712,43 @@ impl ConfigLoader for TomlConfigLoader {
self.config.lock().unwrap().ipv6 = addr.map(|addr| addr.to_string()); self.config.lock().unwrap().ipv6 = addr.map(|addr| addr.to_string());
} }
fn get_ipv6_public_addr_provider(&self) -> bool {
self.config
.lock()
.unwrap()
.ipv6_public_addr_provider
.unwrap_or_default()
}
fn set_ipv6_public_addr_provider(&self, enabled: bool) {
self.config.lock().unwrap().ipv6_public_addr_provider = Some(enabled);
}
fn get_ipv6_public_addr_auto(&self) -> bool {
self.config
.lock()
.unwrap()
.ipv6_public_addr_auto
.unwrap_or_default()
}
fn set_ipv6_public_addr_auto(&self, enabled: bool) {
self.config.lock().unwrap().ipv6_public_addr_auto = Some(enabled);
}
fn get_ipv6_public_addr_prefix(&self) -> Option<cidr::Ipv6Cidr> {
let locked_config = self.config.lock().unwrap();
locked_config
.ipv6_public_addr_prefix
.as_ref()
.and_then(|s| s.parse().ok())
}
fn set_ipv6_public_addr_prefix(&self, prefix: Option<cidr::Ipv6Cidr>) {
self.config.lock().unwrap().ipv6_public_addr_prefix =
prefix.map(|prefix| prefix.to_string());
}
fn get_dhcp(&self) -> bool { fn get_dhcp(&self) -> bool {
self.config.lock().unwrap().dhcp.unwrap_or_default() self.config.lock().unwrap().dhcp.unwrap_or_default()
} }
@@ -1312,6 +1361,26 @@ source = "user"
assert!(!explicit_user.dump().contains("[source]")); assert!(!explicit_user.dump().contains("[source]"));
} }
#[test]
fn test_ipv6_public_addr_config_roundtrip() {
let config = TomlConfigLoader::default();
let prefix: cidr::Ipv6Cidr = "2001:db8:100::/64".parse().unwrap();
config.set_ipv6_public_addr_provider(true);
config.set_ipv6_public_addr_auto(true);
config.set_ipv6_public_addr_prefix(Some(prefix));
assert!(config.get_ipv6_public_addr_provider());
assert!(config.get_ipv6_public_addr_auto());
assert_eq!(config.get_ipv6_public_addr_prefix(), Some(prefix));
let dumped = config.dump();
let loaded = TomlConfigLoader::new_from_str(&dumped).unwrap();
assert!(loaded.get_ipv6_public_addr_provider());
assert!(loaded.get_ipv6_public_addr_auto());
assert_eq!(loaded.get_ipv6_public_addr_prefix(), Some(prefix));
}
#[tokio::test] #[tokio::test]
async fn full_example_test() { async fn full_example_test() {
let config_str = r#" let config_str = r#"
+3
View File
@@ -68,6 +68,8 @@ pub enum GlobalCtxEvent {
DhcpIpv4Changed(Option<cidr::Ipv4Inet>, Option<cidr::Ipv4Inet>), // (old, new) DhcpIpv4Changed(Option<cidr::Ipv4Inet>, Option<cidr::Ipv4Inet>), // (old, new)
DhcpIpv4Conflicted(Option<cidr::Ipv4Inet>), DhcpIpv4Conflicted(Option<cidr::Ipv4Inet>),
PublicIpv6Changed(Option<cidr::Ipv6Inet>, Option<cidr::Ipv6Inet>), // (old, new)
PublicIpv6RoutesUpdated(Vec<cidr::Ipv6Inet>, Vec<cidr::Ipv6Inet>), // (added, removed)
PortForwardAdded(PortForwardConfigPb), PortForwardAdded(PortForwardConfigPb),
@@ -770,6 +772,7 @@ pub mod tests {
assert!(feature_flags.support_conn_list_sync); assert!(feature_flags.support_conn_list_sync);
assert!(feature_flags.avoid_relay_data); assert!(feature_flags.avoid_relay_data);
assert!(feature_flags.is_public_server); assert!(feature_flags.is_public_server);
assert!(!feature_flags.ipv6_public_addr_provider);
} }
#[tokio::test] #[tokio::test]
+11
View File
@@ -166,3 +166,14 @@ pub type IfConfiger = DummyIfConfiger;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
pub use windows::RegistryManager; pub use windows::RegistryManager;
#[cfg(target_os = "linux")]
pub(crate) fn list_ipv6_route_messages()
-> Result<Vec<netlink_packet_route::route::RouteMessage>, Error> {
netlink::NetlinkIfConfiger::list_ipv6_route_messages()
}
#[cfg(target_os = "linux")]
pub(crate) fn get_interface_index(name: &str) -> Result<u32, Error> {
netlink::NetlinkIfConfiger::get_interface_index(name)
}
+200 -16
View File
@@ -160,7 +160,7 @@ impl From<RouteMessage> for Route {
pub struct NetlinkIfConfiger {} pub struct NetlinkIfConfiger {}
impl NetlinkIfConfiger { impl NetlinkIfConfiger {
fn get_interface_index(name: &str) -> Result<u32, Error> { pub(crate) fn get_interface_index(name: &str) -> Result<u32, Error> {
let name = CString::new(name).with_context(|| "failed to convert interface name")?; let name = CString::new(name).with_context(|| "failed to convert interface name")?;
match unsafe { libc::if_nametoindex(name.as_ptr()) } { match unsafe { libc::if_nametoindex(name.as_ptr()) } {
0 => Err(std::io::Error::last_os_error().into()), 0 => Err(std::io::Error::last_os_error().into()),
@@ -311,7 +311,7 @@ impl NetlinkIfConfiger {
Self::set_flags_op(name, SIOCGIFFLAGS, InterfaceFlags::empty()) Self::set_flags_op(name, SIOCGIFFLAGS, InterfaceFlags::empty())
} }
fn list_routes() -> Result<Vec<RouteMessage>, Error> { fn list_route_messages(address_family: AddressFamily) -> Result<Vec<RouteMessage>, Error> {
let mut message = RouteMessage::default(); let mut message = RouteMessage::default();
message.header.table = RouteHeader::RT_TABLE_UNSPEC; message.header.table = RouteHeader::RT_TABLE_UNSPEC;
@@ -320,7 +320,7 @@ impl NetlinkIfConfiger {
message.header.scope = RouteScope::Universe; message.header.scope = RouteScope::Universe;
message.header.kind = RouteType::Unicast; message.header.kind = RouteType::Unicast;
message.header.address_family = AddressFamily::Inet; message.header.address_family = address_family;
message.header.destination_prefix_length = 0; message.header.destination_prefix_length = 0;
message.header.source_prefix_length = 0; message.header.source_prefix_length = 0;
@@ -367,6 +367,14 @@ impl NetlinkIfConfiger {
Ok(ret_vec) Ok(ret_vec)
} }
fn list_routes() -> Result<Vec<RouteMessage>, Error> {
Self::list_route_messages(AddressFamily::Inet)
}
pub(crate) fn list_ipv6_route_messages() -> Result<Vec<RouteMessage>, Error> {
Self::list_route_messages(AddressFamily::Inet6)
}
} }
#[async_trait] #[async_trait]
@@ -551,12 +559,9 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
message.header.scope = RouteScope::Universe; message.header.scope = RouteScope::Universe;
message.header.kind = RouteType::Unicast; message.header.kind = RouteType::Unicast;
// Add metric (cost) if specified message
if let Some(cost) = cost { .attributes
message .push(RouteAttribute::Priority(cost.unwrap_or(65535) as u32));
.attributes
.push(RouteAttribute::Priority(cost as u32));
}
message message
.attributes .attributes
@@ -564,9 +569,11 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
name, name,
)?)); )?));
message if cidr_prefix != 0 {
.attributes message
.push(RouteAttribute::Destination(RouteAddress::Inet6(address))); .attributes
.push(RouteAttribute::Destination(RouteAddress::Inet6(address)));
}
send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::NewRoute(message), false) send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::NewRoute(message), false)
} }
@@ -577,7 +584,7 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
address: std::net::Ipv6Addr, address: std::net::Ipv6Addr,
cidr_prefix: u8, cidr_prefix: u8,
) -> Result<(), Error> { ) -> Result<(), Error> {
let routes = Self::list_routes()?; let routes = Self::list_route_messages(AddressFamily::Inet6)?;
let ifidx = NetlinkIfConfiger::get_interface_index(name)?; let ifidx = NetlinkIfConfiger::get_interface_index(name)?;
for msg in routes { for msg in routes {
@@ -598,29 +605,82 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::process::Command;
const DUMMY_IFACE_NAME: &str = "dummy"; const DUMMY_IFACE_NAME: &str = "dummy";
fn run_cmd(cmd: &str) -> String { fn run_cmd(cmd: &str) -> String {
let output = std::process::Command::new("sh") let output = Command::new("sh")
.arg("-c") .arg("-c")
.arg(cmd) .arg(cmd)
.output() .output()
.expect("failed to execute process"); .expect("failed to execute process");
assert!(
output.status.success(),
"command failed: {cmd}\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
String::from_utf8(output.stdout).unwrap() String::from_utf8(output.stdout).unwrap()
} }
fn run_ip(args: &[&str]) {
let output = Command::new("ip")
.args(args)
.output()
.expect("failed to execute ip process");
assert!(
output.status.success(),
"ip command failed: {:?}\nstdout: {}\nstderr: {}",
args,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
}
fn test_iface_name(tag: &str) -> String {
format!("et{}{:x}", tag, std::process::id() & 0xffff)
}
struct ScopedDummyLink {
name: String,
}
impl ScopedDummyLink {
fn new(name: &str) -> Self {
let _ = Command::new("ip").args(["link", "del", name]).output();
run_ip(&["link", "add", name, "type", "dummy"]);
run_ip(&["link", "set", name, "up"]);
Self {
name: name.to_string(),
}
}
}
impl Drop for ScopedDummyLink {
fn drop(&mut self) {
let _ = Command::new("ip")
.args(["link", "del", &self.name])
.output();
}
}
struct PrepareEnv {} struct PrepareEnv {}
impl PrepareEnv { impl PrepareEnv {
fn new() -> Self { fn new() -> Self {
let _ = run_cmd(&format!("sudo ip link add {} type dummy", DUMMY_IFACE_NAME)); let _ = Command::new("ip")
.args(["link", "del", DUMMY_IFACE_NAME])
.output();
let _ = run_cmd(&format!("ip link add {} type dummy", DUMMY_IFACE_NAME));
PrepareEnv {} PrepareEnv {}
} }
} }
impl Drop for PrepareEnv { impl Drop for PrepareEnv {
fn drop(&mut self) { fn drop(&mut self) {
let _ = run_cmd(&format!("sudo ip link del {}", DUMMY_IFACE_NAME)); let _ = Command::new("ip")
.args(["link", "del", DUMMY_IFACE_NAME])
.output();
} }
} }
@@ -701,4 +761,128 @@ mod tests {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert!(!routes.contains(&IpAddr::V4("10.5.5.0".parse().unwrap()))); assert!(!routes.contains(&IpAddr::V4("10.5.5.0".parse().unwrap())));
} }
#[serial_test::serial]
#[tokio::test]
async fn ipv6_addr_readback_test() {
let iface = test_iface_name("a");
let _link = ScopedDummyLink::new(&iface);
run_ip(&["-6", "addr", "add", "2001:db8:1234::2/64", "dev", &iface]);
let addrs = NetlinkIfConfiger::list_addresses(&iface).unwrap();
assert!(addrs.iter().any(|addr| {
addr.address() == IpAddr::V6("2001:db8:1234::2".parse().unwrap())
&& addr.network_length() == 64
}));
}
#[serial_test::serial]
#[tokio::test]
async fn ipv6_route_readback_test() {
let wan_if = test_iface_name("rw");
let lan_if = test_iface_name("rl");
let _wan = ScopedDummyLink::new(&wan_if);
let _lan = ScopedDummyLink::new(&lan_if);
run_ip(&[
"-6",
"addr",
"add",
"2001:db8:100:ffff::2/64",
"dev",
&wan_if,
]);
run_ip(&[
"-6",
"route",
"add",
"default",
"from",
"2001:db8:100::/56",
"dev",
&wan_if,
]);
run_ip(&["-6", "route", "add", "2001:db8:100::/56", "dev", &lan_if]);
let wan_ifindex = NetlinkIfConfiger::get_interface_index(&wan_if).unwrap();
let lan_ifindex = NetlinkIfConfiger::get_interface_index(&lan_if).unwrap();
let routes = NetlinkIfConfiger::list_ipv6_route_messages().unwrap();
assert!(routes.iter().any(|route| {
route.header.kind == RouteType::Unicast
&& route.header.source_prefix_length == 56
&& route.attributes.iter().any(|attr| {
matches!(
attr,
RouteAttribute::Source(RouteAddress::Inet6(addr))
if *addr == "2001:db8:100::".parse::<std::net::Ipv6Addr>().unwrap()
)
})
&& route
.attributes
.iter()
.any(|attr| matches!(attr, RouteAttribute::Oif(index) if *index == wan_ifindex))
&& !route
.attributes
.iter()
.any(|attr| matches!(attr, RouteAttribute::Destination(_)))
}));
assert!(routes.iter().any(|route| {
route.header.kind == RouteType::Unicast
&& route.header.destination_prefix_length == 56
&& route.attributes.iter().any(|attr| {
matches!(
attr,
RouteAttribute::Destination(RouteAddress::Inet6(addr))
if *addr == "2001:db8:100::".parse::<std::net::Ipv6Addr>().unwrap()
)
})
&& route
.attributes
.iter()
.any(|attr| matches!(attr, RouteAttribute::Oif(index) if *index == lan_ifindex))
}));
}
#[serial_test::serial]
#[tokio::test]
async fn ipv6_route_remove_test() {
let iface = test_iface_name("rr");
let _link = ScopedDummyLink::new(&iface);
let ifcfg = NetlinkIfConfiger {};
let route_addr = "2001:db8:200::".parse::<std::net::Ipv6Addr>().unwrap();
ifcfg
.add_ipv6_route(&iface, route_addr, 56, None)
.await
.unwrap();
let ifindex = NetlinkIfConfiger::get_interface_index(&iface).unwrap();
let has_route = |routes: &[RouteMessage]| {
routes.iter().any(|route| {
route.header.destination_prefix_length == 56
&& route.attributes.iter().any(|attr| {
matches!(
attr,
RouteAttribute::Destination(RouteAddress::Inet6(addr)) if *addr == route_addr
)
})
&& route
.attributes
.iter()
.any(|attr| matches!(attr, RouteAttribute::Oif(index) if *index == ifindex))
})
};
let routes = NetlinkIfConfiger::list_ipv6_route_messages().unwrap();
assert!(has_route(&routes));
ifcfg
.remove_ipv6_route(&iface, route_addr, 56)
.await
.unwrap();
let routes = NetlinkIfConfiger::list_ipv6_route_messages().unwrap();
assert!(!has_route(&routes));
}
} }
+39
View File
@@ -171,6 +171,31 @@ struct NetworkOptions {
)] )]
ipv6: Option<String>, ipv6: Option<String>,
#[arg(
long,
env = "ET_IPV6_PUBLIC_ADDR_PROVIDER",
help = t!("core_clap.ipv6_public_addr_provider").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
ipv6_public_addr_provider: Option<bool>,
#[arg(
long,
env = "ET_IPV6_PUBLIC_ADDR_AUTO",
help = t!("core_clap.ipv6_public_addr_auto").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
ipv6_public_addr_auto: Option<bool>,
#[arg(
long,
env = "ET_IPV6_PUBLIC_ADDR_PREFIX",
help = t!("core_clap.ipv6_public_addr_prefix").to_string()
)]
ipv6_public_addr_prefix: Option<String>,
#[arg( #[arg(
short, short,
long, long,
@@ -875,6 +900,20 @@ impl NetworkOptions {
})?)) })?))
} }
if let Some(enabled) = self.ipv6_public_addr_provider {
cfg.set_ipv6_public_addr_provider(enabled);
}
if let Some(enabled) = self.ipv6_public_addr_auto {
cfg.set_ipv6_public_addr_auto(enabled);
}
if let Some(prefix) = &self.ipv6_public_addr_prefix {
cfg.set_ipv6_public_addr_prefix(Some(prefix.parse().with_context(|| {
format!("failed to parse ipv6 public address prefix: {}", prefix)
})?));
}
if !self.peers.is_empty() { if !self.peers.is_empty() {
let mut peers = cfg.get_peers(); let mut peers = cfg.get_peers();
peers.reserve(peers.len() + self.peers.len()); peers.reserve(peers.len() + self.peers.len());
+572 -2
View File
@@ -1,16 +1,21 @@
#[cfg(feature = "tun")] #[cfg(feature = "tun")]
use std::any::Any; use std::any::Any;
use std::collections::HashSet; use std::collections::HashSet;
#[cfg(target_os = "linux")]
use std::net::Ipv6Addr;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
#[cfg(target_os = "linux")]
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
#[cfg(feature = "tun")] #[cfg(feature = "tun")]
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use cidr::{IpCidr, Ipv4Inet}; use cidr::{IpCidr, Ipv4Inet, Ipv6Cidr};
use futures::FutureExt; use futures::FutureExt;
#[cfg(target_os = "linux")]
use netlink_packet_route::route::{RouteAddress, RouteAttribute, RouteMessage, RouteType};
use tokio::sync::{Mutex, Notify}; use tokio::sync::{Mutex, Notify};
#[cfg(feature = "tun")] #[cfg(feature = "tun")]
use tokio::{sync::oneshot, task::JoinSet}; use tokio::{sync::oneshot, task::JoinSet};
@@ -23,6 +28,8 @@ use crate::common::acl_processor::AclRuleBuilder;
use crate::common::config::ConfigLoader; use crate::common::config::ConfigLoader;
use crate::common::error::Error; use crate::common::error::Error;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent}; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent};
#[cfg(target_os = "linux")]
use crate::common::ifcfg::{get_interface_index, list_ipv6_route_messages};
use crate::connector::direct::DirectConnectorManager; use crate::connector::direct::DirectConnectorManager;
use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager}; use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager};
use crate::connector::tcp_hole_punch::TcpHolePunchConnector; use crate::connector::tcp_hole_punch::TcpHolePunchConnector;
@@ -195,6 +202,202 @@ pub struct InstanceRpcServerHook {
rpc_portal_whitelist: Vec<IpCidr>, rpc_portal_whitelist: Vec<IpCidr>,
} }
fn is_global_routable_public_ipv6_prefix(prefix: Ipv6Cidr) -> bool {
let addr = prefix.first_address();
!addr.is_loopback()
&& !addr.is_multicast()
&& !addr.is_unicast_link_local()
&& !addr.is_unique_local()
&& !addr.is_unspecified()
}
fn ensure_public_ipv6_provider_supported() -> Result<(), Error> {
if cfg!(target_os = "linux") {
return Ok(());
}
Err(anyhow::anyhow!(
"the provider feature requires Linux; run without --ipv6-public-addr-provider on this node, or move the provider role to a Linux node. client mode (--ipv6-public-addr-auto) works on all platforms"
)
.into())
}
fn public_ipv6_provider_auto_detect_error() -> Error {
anyhow::anyhow!(
"no public IPv6 prefix found on this system; set --ipv6-public-addr-prefix manually, or check that your ISP has delegated an IPv6 prefix and a default-from route exists in the kernel routing table"
)
.into()
}
#[cfg(target_os = "linux")]
fn read_linux_proc_bool(path: &Path) -> Result<bool, Error> {
let value = std::fs::read_to_string(path)
.with_context(|| format!("failed to read {}", path.display()))?;
match value.trim() {
"0" => Ok(false),
"1" => Ok(true),
other => Err(anyhow::anyhow!("unexpected value '{}' in {}", other, path.display()).into()),
}
}
#[cfg(target_os = "linux")]
fn write_linux_proc_bool(path: &Path, enabled: bool) -> Result<(), Error> {
let value = if enabled { "1\n" } else { "0\n" };
std::fs::write(path, value).with_context(|| format!("failed to write {}", path.display()))?;
Ok(())
}
#[cfg(target_os = "linux")]
fn ensure_linux_ipv6_forwarding_at_paths(
all_path: &Path,
default_path: &Path,
) -> Result<bool, Error> {
let all_enabled = read_linux_proc_bool(all_path)?;
let default_enabled = read_linux_proc_bool(default_path)?;
let mut changed = false;
if !all_enabled {
write_linux_proc_bool(all_path, true)?;
changed = true;
}
if !default_enabled {
write_linux_proc_bool(default_path, true)?;
changed = true;
}
if !read_linux_proc_bool(all_path)? || !read_linux_proc_bool(default_path)? {
return Err(anyhow::anyhow!(
"failed to enable Linux IPv6 forwarding in {} and {}",
all_path.display(),
default_path.display()
)
.into());
}
Ok(changed)
}
#[cfg(target_os = "linux")]
fn ensure_linux_ipv6_forwarding() -> Result<bool, Error> {
let all_path = Path::new("/proc/sys/net/ipv6/conf/all/forwarding");
let default_path = Path::new("/proc/sys/net/ipv6/conf/default/forwarding");
ensure_linux_ipv6_forwarding_at_paths(all_path, default_path).map_err(|err| {
anyhow::anyhow!(
"public IPv6 provider requires Linux IPv6 forwarding; failed to enable net.ipv6.conf.all.forwarding=1 and net.ipv6.conf.default.forwarding=1 automatically: {}. run with sufficient privileges or set them manually",
err
)
.into()
})
}
#[cfg(target_os = "linux")]
#[derive(Clone, Debug, PartialEq, Eq)]
struct DetectedIpv6Route {
dst: Option<Ipv6Cidr>,
src: Option<Ipv6Cidr>,
ifindex: Option<u32>,
kind: RouteType,
}
#[cfg(target_os = "linux")]
fn ipv6_cidr_from_route_addr(addr: RouteAddress, prefix_len: u8) -> Option<Ipv6Cidr> {
match addr {
RouteAddress::Inet6(addr) => Ipv6Cidr::new(addr, prefix_len).ok(),
_ => None,
}
}
#[cfg(target_os = "linux")]
impl TryFrom<RouteMessage> for DetectedIpv6Route {
type Error = Error;
fn try_from(message: RouteMessage) -> Result<Self, Self::Error> {
let dst = message.attributes.iter().find_map(|attr| match attr {
RouteAttribute::Destination(addr) => {
ipv6_cidr_from_route_addr(addr.clone(), message.header.destination_prefix_length)
}
_ => None,
});
let src = message.attributes.iter().find_map(|attr| match attr {
RouteAttribute::Source(addr) => {
ipv6_cidr_from_route_addr(addr.clone(), message.header.source_prefix_length)
}
_ => None,
});
let ifindex = message.attributes.iter().find_map(|attr| match attr {
RouteAttribute::Oif(index) => Some(*index),
_ => None,
});
Ok(Self {
dst,
src,
ifindex,
kind: message.header.kind,
})
}
}
#[cfg(target_os = "linux")]
fn is_ipv6_default_route(dst: Option<Ipv6Cidr>) -> bool {
dst.is_none() || dst == Some(Ipv6Cidr::new(Ipv6Addr::UNSPECIFIED, 0).unwrap())
}
#[cfg(target_os = "linux")]
fn detect_public_ipv6_prefix_from_routes(
routes: &[DetectedIpv6Route],
loopback_ifindex: u32,
) -> Option<Ipv6Cidr> {
routes
.iter()
.filter_map(|route| {
if !is_ipv6_default_route(route.dst) {
return None;
}
let prefix = route.src?;
let wan_ifindex = route.ifindex?;
if !is_global_routable_public_ipv6_prefix(prefix) {
return None;
}
let delegated = routes.iter().any(|candidate| {
candidate.dst == Some(prefix)
&& candidate.ifindex.is_some()
&& candidate.ifindex != Some(wan_ifindex)
&& candidate.ifindex != Some(loopback_ifindex)
&& candidate.kind == RouteType::Unicast
});
delegated.then_some(prefix)
})
.min_by_key(|prefix| prefix.network_length())
}
#[cfg(target_os = "linux")]
async fn detect_public_ipv6_prefix_linux() -> Result<Option<Ipv6Cidr>, Error> {
let routes = list_ipv6_route_messages().with_context(|| "failed to query linux ipv6 routes")?;
let routes = routes
.iter()
.cloned()
.map(DetectedIpv6Route::try_from)
.collect::<Result<Vec<_>, _>>()?;
let loopback_ifindex =
get_interface_index("lo").with_context(|| "failed to resolve linux loopback ifindex")?;
Ok(detect_public_ipv6_prefix_from_routes(
&routes,
loopback_ifindex,
))
}
#[cfg(not(target_os = "linux"))]
async fn detect_public_ipv6_prefix_linux() -> Result<Option<Ipv6Cidr>, Error> {
Ok(None)
}
impl InstanceRpcServerHook { impl InstanceRpcServerHook {
pub fn new(rpc_portal_whitelist: Option<Vec<IpCidr>>) -> Self { pub fn new(rpc_portal_whitelist: Option<Vec<IpCidr>>) -> Self {
let rpc_portal_whitelist = rpc_portal_whitelist let rpc_portal_whitelist = rpc_portal_whitelist
@@ -664,6 +867,61 @@ impl Instance {
Ok(()) Ok(())
} }
async fn prepare_public_ipv6_config(&self) -> Result<(), Error> {
if self.global_ctx.config.get_ipv6_public_addr_auto()
&& self.global_ctx.get_ipv6().is_some()
{
return Err(anyhow::anyhow!(
"cannot use --ipv6-public-addr-auto together with a manually set --ipv6; pick one or the other"
)
.into());
}
if !self.global_ctx.config.get_ipv6_public_addr_provider() {
let mut feature_flags = self.global_ctx.get_feature_flags();
feature_flags.ipv6_public_addr_provider = false;
self.global_ctx.set_feature_flags(feature_flags);
return Ok(());
}
ensure_public_ipv6_provider_supported()?;
let prefix = if let Some(prefix) = self.global_ctx.config.get_ipv6_public_addr_prefix() {
prefix
} else {
let _g = self.global_ctx.net_ns.guard();
detect_public_ipv6_prefix_linux()
.await?
.ok_or_else(public_ipv6_provider_auto_detect_error)?
};
if !is_global_routable_public_ipv6_prefix(prefix) {
return Err(anyhow::anyhow!(
"the prefix {} is not a valid global unicast IPv6 prefix; it must be a routable address range, not a private, link-local, or multicast address",
prefix
)
.into());
}
#[cfg(target_os = "linux")]
{
let _g = self.global_ctx.net_ns.guard();
if ensure_linux_ipv6_forwarding()? {
tracing::info!(
"enabled Linux IPv6 forwarding for public IPv6 provider at runtime; this change is not persisted across reboot"
);
}
}
self.global_ctx
.config
.set_ipv6_public_addr_prefix(Some(prefix));
let mut feature_flags = self.global_ctx.get_feature_flags();
feature_flags.ipv6_public_addr_provider = true;
self.global_ctx.set_feature_flags(feature_flags);
Ok(())
}
// use a mock nic ctx to consume packets. // use a mock nic ctx to consume packets.
#[cfg(feature = "tun")] #[cfg(feature = "tun")]
async fn clear_nic_ctx( async fn clear_nic_ctx(
@@ -932,6 +1190,7 @@ impl Instance {
} }
pub async fn run(&mut self) -> Result<(), Error> { pub async fn run(&mut self) -> Result<(), Error> {
self.prepare_public_ipv6_config().await?;
self.listener_manager self.listener_manager
.lock() .lock()
.await .await
@@ -1543,10 +1802,88 @@ impl Drop for Instance {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[cfg(target_os = "linux")]
use netlink_packet_route::route::RouteType;
#[cfg(target_os = "linux")]
use std::fs;
#[cfg(target_os = "linux")]
use std::path::PathBuf;
#[cfg(target_os = "linux")]
use std::process::Command;
use crate::{ use crate::{
instance::instance::InstanceRpcServerHook, proto::rpc_impl::standalone::RpcServerHook, instance::instance::InstanceRpcServerHook, proto::rpc_impl::standalone::RpcServerHook,
}; };
#[cfg(target_os = "linux")]
use super::{
DetectedIpv6Route, detect_public_ipv6_prefix_from_routes, detect_public_ipv6_prefix_linux,
ensure_linux_ipv6_forwarding_at_paths, ensure_public_ipv6_provider_supported,
public_ipv6_provider_auto_detect_error,
};
#[cfg(not(target_os = "linux"))]
use super::{ensure_public_ipv6_provider_supported, public_ipv6_provider_auto_detect_error};
#[cfg(target_os = "linux")]
fn run_ip(args: &[&str]) {
let output = Command::new("ip")
.args(args)
.output()
.expect("failed to execute ip process");
assert!(
output.status.success(),
"ip command failed: {:?}\nstdout: {}\nstderr: {}",
args,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
}
#[cfg(target_os = "linux")]
fn test_iface_name(tag: &str) -> String {
format!("et{}{:x}", tag, std::process::id() & 0xffff)
}
#[cfg(target_os = "linux")]
struct ScopedDummyLink {
name: String,
}
#[cfg(target_os = "linux")]
impl ScopedDummyLink {
fn new(name: &str) -> Self {
let _ = Command::new("ip").args(["link", "del", name]).output();
run_ip(&["link", "add", name, "type", "dummy"]);
run_ip(&["link", "set", name, "up"]);
Self {
name: name.to_string(),
}
}
}
#[cfg(target_os = "linux")]
impl Drop for ScopedDummyLink {
fn drop(&mut self) {
let _ = Command::new("ip")
.args(["link", "del", &self.name])
.output();
}
}
#[cfg(target_os = "linux")]
fn temp_forwarding_paths(
all_value: &str,
default_value: &str,
) -> (tempfile::TempDir, PathBuf, PathBuf) {
let dir = tempfile::tempdir().unwrap();
let all_path = dir.path().join("all_forwarding");
let default_path = dir.path().join("default_forwarding");
fs::write(&all_path, all_value).unwrap();
fs::write(&default_path, default_value).unwrap();
(dir, all_path, default_path)
}
#[tokio::test] #[tokio::test]
async fn test_rpc_portal_whitelist() { async fn test_rpc_portal_whitelist() {
use cidr::IpCidr; use cidr::IpCidr;
@@ -1665,4 +2002,237 @@ mod tests {
} }
} }
} }
#[cfg(target_os = "linux")]
fn route(
dst: Option<&str>,
src: Option<&str>,
ifindex: Option<u32>,
kind: RouteType,
) -> DetectedIpv6Route {
DetectedIpv6Route {
dst: dst.map(|cidr| cidr.parse().unwrap()),
src: src.map(|cidr| cidr.parse().unwrap()),
ifindex,
kind,
}
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_selects_delegated_prefix() {
let routes = vec![
route(None, Some("2001:db8:1::/56"), Some(2), RouteType::Unicast),
route(Some("2001:db8:1::/56"), None, Some(3), RouteType::Unicast),
];
assert_eq!(
detect_public_ipv6_prefix_from_routes(&routes, 1),
Some("2001:db8:1::/56".parse().unwrap())
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_rejects_non_public_prefixes() {
let routes = vec![
route(Some("::/0"), Some("fd00::/48"), Some(2), RouteType::Unicast),
route(Some("fd00::/48"), None, Some(3), RouteType::Unicast),
route(None, Some("fe80::/64"), Some(4), RouteType::Unicast),
route(Some("fe80::/64"), None, Some(5), RouteType::Unicast),
route(None, Some("ff00::/8"), Some(6), RouteType::Unicast),
route(Some("ff00::/8"), None, Some(7), RouteType::Unicast),
route(None, Some("::/0"), Some(8), RouteType::Unicast),
route(Some("::/0"), None, Some(9), RouteType::Unicast),
];
assert_eq!(detect_public_ipv6_prefix_from_routes(&routes, 1), None);
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_requires_delegated_route() {
let routes = vec![route(
None,
Some("2001:db8:1::/56"),
Some(2),
RouteType::Unicast,
)];
assert_eq!(detect_public_ipv6_prefix_from_routes(&routes, 1), None);
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_rejects_loopback_delegation() {
let routes = vec![
route(None, Some("2001:db8:1::/56"), Some(2), RouteType::Unicast),
route(Some("2001:db8:1::/56"), None, Some(1), RouteType::Unicast),
];
assert_eq!(detect_public_ipv6_prefix_from_routes(&routes, 1), None);
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_prefers_shortest_prefix() {
let routes = vec![
route(None, Some("2001:db8:1::/56"), Some(2), RouteType::Unicast),
route(Some("2001:db8:1::/56"), None, Some(3), RouteType::Unicast),
route(None, Some("2001:db8::/48"), Some(4), RouteType::Unicast),
route(Some("2001:db8::/48"), None, Some(5), RouteType::Unicast),
];
assert_eq!(
detect_public_ipv6_prefix_from_routes(&routes, 1),
Some("2001:db8::/48".parse().unwrap())
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_detect_public_ipv6_prefix_from_routes_rejects_non_unicast_delegation() {
let routes = vec![
route(None, Some("2001:db8:1::/56"), Some(2), RouteType::Unicast),
route(Some("2001:db8:1::/56"), None, Some(3), RouteType::BlackHole),
];
assert_eq!(detect_public_ipv6_prefix_from_routes(&routes, 1), None);
}
#[test]
fn test_public_ipv6_provider_auto_detect_error_mentions_manual_prefix() {
let err = public_ipv6_provider_auto_detect_error();
let msg = err.to_string();
assert!(msg.contains("IPv6 prefix"), "{}", msg);
assert!(msg.contains("ipv6-public-addr-prefix"), "{}", msg);
}
#[cfg(target_os = "linux")]
#[test]
fn test_public_ipv6_provider_platform_check_accepts_linux() {
assert!(ensure_public_ipv6_provider_supported().is_ok());
}
#[cfg(target_os = "linux")]
#[test]
fn test_ensure_linux_ipv6_forwarding_enables_all_and_default() {
let (_dir, all_path, default_path) = temp_forwarding_paths("0\n", "0\n");
let changed = ensure_linux_ipv6_forwarding_at_paths(&all_path, &default_path).unwrap();
assert!(changed);
assert_eq!(fs::read_to_string(&all_path).unwrap(), "1\n");
assert_eq!(fs::read_to_string(&default_path).unwrap(), "1\n");
}
#[cfg(target_os = "linux")]
#[test]
fn test_ensure_linux_ipv6_forwarding_is_noop_when_already_enabled() {
let (_dir, all_path, default_path) = temp_forwarding_paths("1\n", "1\n");
let changed = ensure_linux_ipv6_forwarding_at_paths(&all_path, &default_path).unwrap();
assert!(!changed);
assert_eq!(fs::read_to_string(&all_path).unwrap(), "1\n");
assert_eq!(fs::read_to_string(&default_path).unwrap(), "1\n");
}
#[cfg(not(target_os = "linux"))]
#[test]
fn test_public_ipv6_provider_platform_check_reports_linux_only() {
let err = ensure_public_ipv6_provider_supported().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("Linux"), "{}", msg);
assert!(msg.contains("ipv6-public-addr-auto"), "{}", msg);
}
#[cfg(target_os = "linux")]
#[serial_test::serial]
#[tokio::test]
async fn test_detect_public_ipv6_prefix_linux_reads_netlink_routes_from_kernel() {
let wan_if = test_iface_name("dw");
let lan_if = test_iface_name("dl");
let _wan = ScopedDummyLink::new(&wan_if);
let _lan = ScopedDummyLink::new(&lan_if);
run_ip(&[
"-6",
"addr",
"add",
"2001:db8:100:ffff::1/64",
"dev",
&wan_if,
]);
run_ip(&[
"-6",
"route",
"add",
"default",
"from",
"2001:db8:100::/56",
"dev",
&wan_if,
]);
run_ip(&["-6", "route", "add", "2001:db8:100::/56", "dev", &lan_if]);
assert_eq!(
detect_public_ipv6_prefix_linux().await.unwrap(),
Some("2001:db8:100::/56".parse().unwrap())
);
}
#[cfg(target_os = "linux")]
#[serial_test::serial]
#[tokio::test]
async fn test_detect_public_ipv6_prefix_linux_prefers_shortest_prefix_from_kernel() {
let wan_if_1 = test_iface_name("sw1");
let lan_if_1 = test_iface_name("sl1");
let wan_if_2 = test_iface_name("sw2");
let lan_if_2 = test_iface_name("sl2");
let _wan_1 = ScopedDummyLink::new(&wan_if_1);
let _lan_1 = ScopedDummyLink::new(&lan_if_1);
let _wan_2 = ScopedDummyLink::new(&wan_if_2);
let _lan_2 = ScopedDummyLink::new(&lan_if_2);
run_ip(&[
"-6",
"addr",
"add",
"2001:db8:3000:ffff::1/64",
"dev",
&wan_if_1,
]);
run_ip(&[
"-6",
"route",
"add",
"default",
"from",
"2001:db8:3000::/56",
"dev",
&wan_if_1,
]);
run_ip(&["-6", "route", "add", "2001:db8:3000::/56", "dev", &lan_if_1]);
run_ip(&["-6", "addr", "add", "2001:db9:ffff::1/64", "dev", &wan_if_2]);
run_ip(&[
"-6",
"route",
"add",
"default",
"from",
"2001:db9::/48",
"dev",
&wan_if_2,
]);
run_ip(&["-6", "route", "add", "2001:db9::/48", "dev", &lan_if_2]);
assert_eq!(
detect_public_ipv6_prefix_linux().await.unwrap(),
Some("2001:db9::/48".parse().unwrap())
);
}
} }
+191 -1
View File
@@ -735,9 +735,26 @@ impl VirtualNic {
} }
pub async fn add_ipv6_route(&self, address: Ipv6Addr, cidr: u8) -> Result<(), Error> { pub async fn add_ipv6_route(&self, address: Ipv6Addr, cidr: u8) -> Result<(), Error> {
self.add_ipv6_route_with_cost(address, cidr, None).await
}
pub async fn add_ipv6_route_with_cost(
&self,
address: Ipv6Addr,
cidr: u8,
cost: Option<i32>,
) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard(); let _g = self.global_ctx.net_ns.guard();
self.ifcfg self.ifcfg
.add_ipv6_route(self.ifname(), address, cidr, None) .add_ipv6_route(self.ifname(), address, cidr, cost)
.await?;
Ok(())
}
pub async fn remove_ipv6_route(&self, address: Ipv6Addr, cidr: u8) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.remove_ipv6_route(self.ifname(), address, cidr)
.await?; .await?;
Ok(()) Ok(())
} }
@@ -1039,6 +1056,44 @@ impl NicCtx {
} }
} }
async fn apply_public_ipv6_route_changes(
ifcfg: &impl IfConfiguerTrait,
ifname: &str,
net_ns: &crate::common::netns::NetNS,
cur_routes: &mut BTreeSet<cidr::Ipv6Inet>,
added: Vec<cidr::Ipv6Inet>,
removed: Vec<cidr::Ipv6Inet>,
) {
for route in removed {
if !cur_routes.contains(&route) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.remove_ipv6_route(ifname, route.address(), route.network_length())
.await;
if ret.is_err() {
tracing::trace!(route = ?route, err = ?ret, "remove public ipv6 route failed");
}
cur_routes.remove(&route);
}
for route in added {
if cur_routes.contains(&route) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.add_ipv6_route(ifname, route.address(), route.network_length(), None)
.await;
if ret.is_err() {
tracing::trace!(route = ?route, err = ?ret, "add public ipv6 route failed");
} else {
cur_routes.insert(route);
}
}
}
async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> { async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> {
let Some(peer_mgr) = self.peer_mgr.upgrade() else { let Some(peer_mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into()); return Err(anyhow::anyhow!("peer manager not available").into());
@@ -1114,6 +1169,137 @@ impl NicCtx {
Ok(()) Ok(())
} }
async fn run_public_ipv6_route_updater(&mut self) -> Result<(), Error> {
let Some(peer_mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into());
};
let global_ctx = self.global_ctx.clone();
let net_ns = self.global_ctx.net_ns.clone();
let nic = self.nic.lock().await;
let ifcfg = nic.get_ifcfg();
let ifname = nic.ifname().to_owned();
let mut event_receiver = global_ctx.subscribe();
self.tasks.spawn(async move {
let mut cur_routes = BTreeSet::<cidr::Ipv6Inet>::new();
let initial_routes = peer_mgr.list_public_ipv6_routes().await;
let initial_added = initial_routes.iter().copied().collect::<Vec<_>>();
Self::apply_public_ipv6_route_changes(
&ifcfg,
&ifname,
&net_ns,
&mut cur_routes,
initial_added,
Vec::new(),
)
.await;
loop {
let event = match event_receiver.recv().await {
Ok(event) => event,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
event_receiver = event_receiver.resubscribe();
let latest = peer_mgr.list_public_ipv6_routes().await;
let added = latest.difference(&cur_routes).copied().collect::<Vec<_>>();
let removed = cur_routes.difference(&latest).copied().collect::<Vec<_>>();
GlobalCtxEvent::PublicIpv6RoutesUpdated(added, removed)
}
};
let (added, removed) = match event {
GlobalCtxEvent::PublicIpv6RoutesUpdated(added, removed) => (added, removed),
_ => continue,
};
Self::apply_public_ipv6_route_changes(
&ifcfg,
&ifname,
&net_ns,
&mut cur_routes,
added,
removed,
)
.await;
}
});
Ok(())
}
async fn run_public_ipv6_addr_updater(&mut self) -> Result<(), Error> {
let Some(peer_mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into());
};
let global_ctx = self.global_ctx.clone();
let nic = self.nic.clone();
let mut event_receiver = global_ctx.subscribe();
self.tasks.spawn(async move {
let mut current_addr = peer_mgr.get_my_public_ipv6_addr().await;
if let Some(addr) = current_addr {
let nic = nic.lock().await;
if let Err(err) = nic.link_up().await {
tracing::warn!(?err, "failed to bring public ipv6 nic link up");
}
if let Err(err) = nic.add_ipv6(addr.address(), addr.network_length() as i32).await {
tracing::warn!(addr = ?addr, ?err, "failed to add public ipv6 address");
}
if let Err(err) = nic
.add_ipv6_route_with_cost(Ipv6Addr::UNSPECIFIED, 0, Some(5))
.await
{
tracing::warn!(route = %Ipv6Addr::UNSPECIFIED, prefix = 0, ?err, "failed to add default public ipv6 route");
}
}
loop {
let event = match event_receiver.recv().await {
Ok(event) => event,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
event_receiver = event_receiver.resubscribe();
let latest = peer_mgr.get_my_public_ipv6_addr().await;
GlobalCtxEvent::PublicIpv6Changed(current_addr, latest)
}
};
let (old, new) = match event {
GlobalCtxEvent::PublicIpv6Changed(old, new) => (old, new),
_ => continue,
};
current_addr = new;
let nic = nic.lock().await;
if let Err(err) = nic.link_up().await {
tracing::warn!(?err, "failed to bring public ipv6 nic link up");
}
if let Some(old) = old {
if let Err(err) = nic.remove_ipv6_route(Ipv6Addr::UNSPECIFIED, 0).await {
tracing::warn!(route = %Ipv6Addr::UNSPECIFIED, prefix = 0, ?err, "failed to remove default public ipv6 route");
}
if let Err(err) = nic.remove_ipv6(Some(old)).await {
tracing::warn!(addr = ?old, ?err, "failed to remove old public ipv6 address");
}
}
if let Some(new) = new {
if let Err(err) = nic.add_ipv6(new.address(), new.network_length() as i32).await
{
tracing::warn!(addr = ?new, ?err, "failed to add public ipv6 address");
}
if let Err(err) = nic
.add_ipv6_route_with_cost(Ipv6Addr::UNSPECIFIED, 0, Some(5))
.await
{
tracing::warn!(route = %Ipv6Addr::UNSPECIFIED, prefix = 0, ?err, "failed to add default public ipv6 route");
}
}
}
});
Ok(())
}
pub async fn run( pub async fn run(
&mut self, &mut self,
ipv4_addr: Option<cidr::Ipv4Inet>, ipv4_addr: Option<cidr::Ipv4Inet>,
@@ -1169,6 +1355,10 @@ impl NicCtx {
} }
self.run_proxy_cidrs_route_updater().await?; self.run_proxy_cidrs_route_updater().await?;
self.run_public_ipv6_route_updater().await?;
if self.global_ctx.config.get_ipv6_public_addr_auto() {
self.run_public_ipv6_addr_updater().await?;
}
Ok(()) Ok(())
} }
+14
View File
@@ -435,6 +435,20 @@ fn handle_event(
event!(info, ?ip, "[{}] dhcp ip conflict", instance_id); event!(info, ?ip, "[{}] dhcp ip conflict", instance_id);
} }
GlobalCtxEvent::PublicIpv6Changed(old, new) => {
event!(info, ?old, ?new, "[{}] public ipv6 changed", instance_id);
}
GlobalCtxEvent::PublicIpv6RoutesUpdated(added, removed) => {
event!(
info,
?added,
?removed,
"[{}] public ipv6 routes updated",
instance_id
);
}
GlobalCtxEvent::PortForwardAdded(cfg) => { GlobalCtxEvent::PortForwardAdded(cfg) => {
event!( event!(
info, info,
+24
View File
@@ -714,6 +714,24 @@ impl NetworkConfig {
flags.use_smoltcp = use_smoltcp; flags.use_smoltcp = use_smoltcp;
} }
if let Some(ipv6_public_addr_provider) = self.ipv6_public_addr_provider {
cfg.set_ipv6_public_addr_provider(ipv6_public_addr_provider);
}
if let Some(ipv6_public_addr_auto) = self.ipv6_public_addr_auto {
cfg.set_ipv6_public_addr_auto(ipv6_public_addr_auto);
}
if let Some(ipv6_public_addr_prefix) = self
.ipv6_public_addr_prefix
.as_ref()
.filter(|prefix| !prefix.is_empty())
{
cfg.set_ipv6_public_addr_prefix(Some(ipv6_public_addr_prefix.parse().with_context(
|| format!("failed to parse ipv6 public address prefix: {ipv6_public_addr_prefix}"),
)?));
}
if let Some(disable_ipv6) = self.disable_ipv6 { if let Some(disable_ipv6) = self.disable_ipv6 {
flags.enable_ipv6 = !disable_ipv6; flags.enable_ipv6 = !disable_ipv6;
} }
@@ -863,6 +881,12 @@ impl NetworkConfig {
result.network_length = Some(ipv4.network_length() as i32); result.network_length = Some(ipv4.network_length() as i32);
} }
result.ipv6_public_addr_provider = Some(config.get_ipv6_public_addr_provider());
result.ipv6_public_addr_auto = Some(config.get_ipv6_public_addr_auto());
result.ipv6_public_addr_prefix = config
.get_ipv6_public_addr_prefix()
.map(|prefix| prefix.to_string());
let peers = config.get_peers(); let peers = config.get_peers();
result.networking_method = Some(NetworkingMethod::Manual as i32); result.networking_method = Some(NetworkingMethod::Manual as i32);
if !peers.is_empty() { if !peers.is_empty() {
+1
View File
@@ -11,6 +11,7 @@ pub mod peer_ospf_route;
pub mod peer_rpc; pub mod peer_rpc;
pub mod peer_rpc_service; pub mod peer_rpc_service;
pub mod peer_session; pub mod peer_session;
pub(crate) mod public_ipv6;
pub mod relay_peer_map; pub mod relay_peer_map;
pub mod route_trait; pub mod route_trait;
pub mod rpc_service; pub mod rpc_service;
+9
View File
@@ -1291,6 +1291,14 @@ impl PeerManager {
self.get_route().list_proxy_cidrs_v6().await self.get_route().list_proxy_cidrs_v6().await
} }
pub async fn list_public_ipv6_routes(&self) -> BTreeSet<cidr::Ipv6Inet> {
self.get_route().list_public_ipv6_routes().await
}
pub async fn get_my_public_ipv6_addr(&self) -> Option<cidr::Ipv6Inet> {
self.get_route().get_my_public_ipv6_addr().await
}
pub async fn dump_route(&self) -> String { pub async fn dump_route(&self) -> String {
self.get_route().dump().await self.get_route().dump().await
} }
@@ -1879,6 +1887,7 @@ impl PeerManager {
version: EASYTIER_VERSION.to_string(), version: EASYTIER_VERSION.to_string(),
feature_flag: Some(self.global_ctx.get_feature_flags()), feature_flag: Some(self.global_ctx.get_feature_flags()),
ip_list: Some(self.global_ctx.get_ip_collector().collect_ip_addrs().await), ip_list: Some(self.global_ctx.get_ip_collector().collect_ip_addrs().await),
public_ipv6_addr: self.get_my_public_ipv6_addr().await.map(Into::into),
} }
} }
+197 -8
View File
@@ -10,7 +10,7 @@ use std::{
}; };
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr}; use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr, Ipv6Inet};
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use dashmap::DashMap; use dashmap::DashMap;
use ordered_hash_map::OrderedHashMap; use ordered_hash_map::OrderedHashMap;
@@ -46,9 +46,10 @@ use crate::{
peer_rpc::{ peer_rpc::{
ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc,
OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion, OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion,
PeerIdentityType, RouteForeignNetworkInfos, RouteForeignNetworkSummary, RoutePeerInfo, PeerIdentityType, PublicIpv6AddrRpcServer, RouteForeignNetworkInfos,
RoutePeerInfos, SyncRouteInfoError, SyncRouteInfoRequest, SyncRouteInfoResponse, RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError,
TrustedCredentialPubkey, TrustedCredentialPubkeyProof, route_foreign_network_infos, SyncRouteInfoRequest, SyncRouteInfoResponse, TrustedCredentialPubkey,
TrustedCredentialPubkeyProof, route_foreign_network_infos,
route_foreign_network_summary, sync_route_info_request::ConnInfo, route_foreign_network_summary, sync_route_info_request::ConnInfo,
}, },
rpc_types::{ rpc_types::{
@@ -63,6 +64,9 @@ use super::{
PeerPacketFilter, PeerPacketFilter,
graph_algo::dijkstra_with_first_hop, graph_algo::dijkstra_with_first_hop,
peer_rpc::PeerRpcManager, peer_rpc::PeerRpcManager,
public_ipv6::{
PublicIpv6PeerRouteInfo, PublicIpv6RouteControl, PublicIpv6Service, PublicIpv6SyncTrigger,
},
route_trait::{ route_trait::{
DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator, DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator,
RouteCostCalculatorInterface, RouteCostCalculatorInterface,
@@ -137,6 +141,10 @@ fn raw_credential_bytes_from_route_info(
.map(|credential| credential.encode_to_vec()) .map(|credential| credential.encode_to_vec())
} }
fn route_peer_inst_id(info: &RoutePeerInfo) -> Option<uuid::Uuid> {
info.inst_id.map(Into::into)
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct AtomicVersion(Arc<AtomicU32>); struct AtomicVersion(Arc<AtomicU32>);
@@ -205,6 +213,8 @@ impl RoutePeerInfo {
quic_port: None, quic_port: None,
noise_static_pubkey: Vec::new(), noise_static_pubkey: Vec::new(),
trusted_credential_pubkeys: Vec::new(), trusted_credential_pubkeys: Vec::new(),
ipv6_public_addr_prefix: None,
ipv6_public_addr_lease: None,
} }
} }
@@ -221,6 +231,7 @@ impl RoutePeerInfo {
my_peer_id: PeerId, my_peer_id: PeerId,
peer_route_id: u64, peer_route_id: u64,
global_ctx: &ArcGlobalCtx, global_ctx: &ArcGlobalCtx,
public_ipv6_addr_lease: Option<Ipv6Inet>,
) -> Self { ) -> Self {
let stun_info = global_ctx.get_stun_info_collector().get_stun_info(); let stun_info = global_ctx.get_stun_info_collector().get_stun_info();
let noise_static_pubkey = global_ctx let noise_static_pubkey = global_ctx
@@ -259,6 +270,14 @@ impl RoutePeerInfo {
.unwrap_or(24), .unwrap_or(24),
ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()), ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()),
ipv6_public_addr_prefix: global_ctx.config.get_ipv6_public_addr_prefix().map(
|prefix| {
Ipv6Inet::new(prefix.first_address(), prefix.network_length())
.unwrap()
.into()
},
),
ipv6_public_addr_lease: public_ipv6_addr_lease.map(Into::into),
groups: global_ctx.get_acl_groups(my_peer_id), groups: global_ctx.get_acl_groups(my_peer_id),
@@ -349,6 +368,7 @@ impl From<RoutePeerInfo> for crate::proto::api::instance::Route {
path_latency_latency_first: None, path_latency_latency_first: None,
ipv6_addr: val.ipv6_addr, ipv6_addr: val.ipv6_addr,
public_ipv6_addr: val.ipv6_public_addr_lease,
} }
} }
} }
@@ -964,8 +984,14 @@ impl SyncedRouteInfo {
my_peer_id: PeerId, my_peer_id: PeerId,
my_peer_route_id: u64, my_peer_route_id: u64,
global_ctx: &ArcGlobalCtx, global_ctx: &ArcGlobalCtx,
public_ipv6_addr_lease: Option<Ipv6Inet>,
) -> bool { ) -> bool {
let mut new = RoutePeerInfo::new_updated_self(my_peer_id, my_peer_route_id, global_ctx); let mut new = RoutePeerInfo::new_updated_self(
my_peer_id,
my_peer_route_id,
global_ctx,
public_ipv6_addr_lease,
);
let mut guard = self.peer_infos.upgradable_read(); let mut guard = self.peer_infos.upgradable_read();
let old = guard.get(&my_peer_id); let old = guard.get(&my_peer_id);
let new_version = old.map(|x| x.version).unwrap_or(0) + 1; let new_version = old.map(|x| x.version).unwrap_or(0) + 1;
@@ -1588,6 +1614,21 @@ impl RouteTable {
.or_insert(peer_id_and_version); .or_insert(peer_id_and_version);
} }
if let Some(ipv6_addr) = info
.ipv6_public_addr_lease
.as_ref()
.and_then(|addr| addr.address)
{
self.ipv6_peer_id_map
.entry(ipv6_addr.into())
.and_modify(|v| {
if is_new_peer_better(v) {
*v = peer_id_and_version;
}
})
.or_insert(peer_id_and_version);
}
for cidr in info.proxy_cidrs.iter() { for cidr in info.proxy_cidrs.iter() {
let Ok(cidr) = cidr.parse::<IpCidr>() else { let Ok(cidr) = cidr.parse::<IpCidr>() else {
tracing::warn!("invalid proxy cidr: {:?}, from peer: {:?}", cidr, peer_id); tracing::warn!("invalid proxy cidr: {:?}, from peer: {:?}", cidr, peer_id);
@@ -2019,6 +2060,8 @@ struct PeerRouteServiceImpl {
foreign_network_owner_map: DashMap<NetworkIdentity, Vec<PeerId>>, foreign_network_owner_map: DashMap<NetworkIdentity, Vec<PeerId>>,
foreign_network_my_peer_id_map: DashMap<(String, PeerId), PeerId>, foreign_network_my_peer_id_map: DashMap<(String, PeerId), PeerId>,
synced_route_info: SyncedRouteInfo, synced_route_info: SyncedRouteInfo,
public_ipv6_service: std::sync::Mutex<Weak<PublicIpv6Service>>,
self_public_ipv6_addr_lease: std::sync::Mutex<Option<Ipv6Inet>>,
cached_local_conn_map: std::sync::Mutex<RouteConnBitmap>, cached_local_conn_map: std::sync::Mutex<RouteConnBitmap>,
cached_local_conn_map_version: AtomicVersion, cached_local_conn_map_version: AtomicVersion,
cached_interface_peer_snapshot: std::sync::Mutex<Arc<InterfacePeerSnapshot>>, cached_interface_peer_snapshot: std::sync::Mutex<Arc<InterfacePeerSnapshot>>,
@@ -2081,6 +2124,8 @@ impl PeerRouteServiceImpl {
non_reusable_credential_owners: DashMap::new(), non_reusable_credential_owners: DashMap::new(),
version: AtomicVersion::new(), version: AtomicVersion::new(),
}, },
public_ipv6_service: std::sync::Mutex::new(Weak::new()),
self_public_ipv6_addr_lease: std::sync::Mutex::new(None),
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()), cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()),
cached_local_conn_map_version: AtomicVersion::new(), cached_local_conn_map_version: AtomicVersion::new(),
cached_interface_peer_snapshot: std::sync::Mutex::new(Arc::new( cached_interface_peer_snapshot: std::sync::Mutex::new(Arc::new(
@@ -2119,6 +2164,20 @@ impl PeerRouteServiceImpl {
.unwrap_or(false) .unwrap_or(false)
} }
fn set_public_ipv6_service(&self, service: Weak<PublicIpv6Service>) {
*self.public_ipv6_service.lock().unwrap() = service;
}
fn public_ipv6_service(&self) -> Option<Arc<PublicIpv6Service>> {
self.public_ipv6_service.lock().unwrap().upgrade()
}
fn notify_public_ipv6_route_change(&self) -> bool {
self.public_ipv6_service()
.map(|service| service.handle_route_change())
.unwrap_or(false)
}
fn get_or_create_session(&self, dst_peer_id: PeerId) -> Arc<SyncRouteSession> { fn get_or_create_session(&self, dst_peer_id: PeerId) -> Arc<SyncRouteSession> {
self.sessions self.sessions
.entry(dst_peer_id) .entry(dst_peer_id)
@@ -2230,6 +2289,7 @@ impl PeerRouteServiceImpl {
self.my_peer_id, self.my_peer_id,
self.my_peer_route_id, self.my_peer_route_id,
&self.global_ctx, &self.global_ctx,
*self.self_public_ipv6_addr_lease.lock().unwrap(),
) )
} }
@@ -2618,14 +2678,19 @@ impl PeerRouteServiceImpl {
untrusted_changed = self.refresh_credential_trusts_and_disconnect().await; untrusted_changed = self.refresh_credential_trusts_and_disconnect().await;
} }
let mut public_ipv6_state_updated = false;
if my_peer_info_updated || my_conn_info_updated || untrusted_changed { if my_peer_info_updated || my_conn_info_updated || untrusted_changed {
self.update_route_table_and_cached_local_conn_bitmap(); self.update_route_table_and_cached_local_conn_bitmap();
self.update_foreign_network_owner_map(); self.update_foreign_network_owner_map();
public_ipv6_state_updated = self.notify_public_ipv6_route_change();
} }
if my_peer_info_updated { if my_peer_info_updated {
self.update_peer_info_last_update(); self.update_peer_info_last_update();
} }
my_peer_info_updated || my_conn_info_updated || my_foreign_network_updated my_peer_info_updated
|| my_conn_info_updated
|| my_foreign_network_updated
|| public_ipv6_state_updated
} }
async fn refresh_acl_groups(&self) -> bool { async fn refresh_acl_groups(&self) -> bool {
@@ -2652,15 +2717,17 @@ impl PeerRouteServiceImpl {
let untrusted = self.refresh_credential_trusts_with_current_topology(); let untrusted = self.refresh_credential_trusts_with_current_topology();
self.disconnect_untrusted_peers(&untrusted).await; self.disconnect_untrusted_peers(&untrusted).await;
let mut public_ipv6_state_updated = false;
if my_peer_info_updated || !untrusted.is_empty() { if my_peer_info_updated || !untrusted.is_empty() {
self.update_route_table_and_cached_local_conn_bitmap(); self.update_route_table_and_cached_local_conn_bitmap();
self.update_foreign_network_owner_map(); self.update_foreign_network_owner_map();
public_ipv6_state_updated = self.notify_public_ipv6_route_change();
} }
if my_peer_info_updated { if my_peer_info_updated {
self.update_peer_info_last_update(); self.update_peer_info_last_update();
} }
my_peer_info_updated || !untrusted.is_empty() my_peer_info_updated || !untrusted.is_empty() || public_ipv6_state_updated
} }
fn refresh_credential_trusts(&self) -> Vec<PeerId> { fn refresh_credential_trusts(&self) -> Vec<PeerId> {
@@ -2968,7 +3035,6 @@ impl PeerRouteServiceImpl {
session session
.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id); .update_dst_saved_foreign_network_version(foreign_network, dst_peer_id);
} }
session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp); session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp);
} }
} }
@@ -3493,7 +3559,13 @@ impl RouteSessionManager {
} }
if need_update_route_table || foreign_network_changed { if need_update_route_table || foreign_network_changed {
service_impl.update_route_table_and_cached_local_conn_bitmap();
service_impl.update_foreign_network_owner_map(); service_impl.update_foreign_network_owner_map();
if need_update_route_table
&& let Some(public_ipv6_service) = service_impl.public_ipv6_service()
{
public_ipv6_service.handle_route_change();
}
} }
tracing::debug!( tracing::debug!(
@@ -3534,12 +3606,86 @@ impl RouteSessionManager {
} }
} }
struct OspfPublicIpv6RouteHandle {
service_impl: Weak<PeerRouteServiceImpl>,
}
impl PublicIpv6RouteControl for OspfPublicIpv6RouteHandle {
fn my_peer_id(&self) -> PeerId {
self.service_impl
.upgrade()
.map(|service_impl| service_impl.my_peer_id)
.unwrap_or_default()
}
fn peer_route_snapshot(&self) -> Vec<PublicIpv6PeerRouteInfo> {
let Some(service_impl) = self.service_impl.upgrade() else {
return Vec::new();
};
service_impl
.synced_route_info
.peer_infos
.read()
.iter()
.map(|(peer_id, info)| PublicIpv6PeerRouteInfo {
peer_id: *peer_id,
inst_id: route_peer_inst_id(info),
is_provider: info
.feature_flag
.as_ref()
.map(|flags| flags.ipv6_public_addr_provider)
.unwrap_or(false),
prefix: info
.ipv6_public_addr_prefix
.map(Into::into)
.map(|prefix: Ipv6Inet| prefix.network()),
lease: info.ipv6_public_addr_lease.map(Into::into),
reachable: *peer_id == service_impl.my_peer_id
|| service_impl.route_table.peer_reachable(*peer_id),
})
.collect()
}
fn publish_self_public_ipv6_lease(&self, lease: Option<Ipv6Inet>) -> bool {
let Some(service_impl) = self.service_impl.upgrade() else {
return false;
};
let mut current = service_impl.self_public_ipv6_addr_lease.lock().unwrap();
if *current == lease {
return false;
}
*current = lease;
drop(current);
let changed = service_impl.update_my_peer_info();
if changed {
service_impl.update_route_table_and_cached_local_conn_bitmap();
service_impl.update_foreign_network_owner_map();
}
changed
}
}
#[derive(Clone)]
struct OspfPublicIpv6SyncTrigger {
session_mgr: RouteSessionManager,
}
impl PublicIpv6SyncTrigger for OspfPublicIpv6SyncTrigger {
fn sync_now(&self, reason: &str) {
self.session_mgr.sync_now(reason);
}
}
pub struct PeerRoute { pub struct PeerRoute {
my_peer_id: PeerId, my_peer_id: PeerId,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
peer_rpc: Weak<PeerRpcManager>, peer_rpc: Weak<PeerRpcManager>,
service_impl: Arc<PeerRouteServiceImpl>, service_impl: Arc<PeerRouteServiceImpl>,
public_ipv6_service: Arc<PublicIpv6Service>,
session_mgr: RouteSessionManager, session_mgr: RouteSessionManager,
tasks: std::sync::Mutex<JoinSet<()>>, tasks: std::sync::Mutex<JoinSet<()>>,
@@ -3563,6 +3709,17 @@ impl PeerRoute {
) -> Arc<Self> { ) -> Arc<Self> {
let service_impl = Arc::new(PeerRouteServiceImpl::new(my_peer_id, global_ctx.clone())); let service_impl = Arc::new(PeerRouteServiceImpl::new(my_peer_id, global_ctx.clone()));
let session_mgr = RouteSessionManager::new(service_impl.clone(), peer_rpc.clone()); let session_mgr = RouteSessionManager::new(service_impl.clone(), peer_rpc.clone());
let public_ipv6_service = Arc::new(PublicIpv6Service::new(
global_ctx.clone(),
Arc::downgrade(&peer_rpc),
Arc::new(OspfPublicIpv6RouteHandle {
service_impl: Arc::downgrade(&service_impl),
}),
Arc::new(OspfPublicIpv6SyncTrigger {
session_mgr: session_mgr.clone(),
}),
));
service_impl.set_public_ipv6_service(Arc::downgrade(&public_ipv6_service));
Arc::new(PeerRoute { Arc::new(PeerRoute {
my_peer_id, my_peer_id,
@@ -3570,6 +3727,7 @@ impl PeerRoute {
peer_rpc: Arc::downgrade(&peer_rpc), peer_rpc: Arc::downgrade(&peer_rpc),
service_impl, service_impl,
public_ipv6_service,
session_mgr, session_mgr,
tasks: std::sync::Mutex::new(JoinSet::new()), tasks: std::sync::Mutex::new(JoinSet::new()),
@@ -3607,6 +3765,9 @@ impl PeerRoute {
tracing::debug!("cost_calculator_need_update"); tracing::debug!("cost_calculator_need_update");
service_impl.synced_route_info.version.inc(); service_impl.synced_route_info.version.inc();
service_impl.update_route_table(); service_impl.update_route_table();
if let Some(public_ipv6_service) = service_impl.public_ipv6_service() {
public_ipv6_service.handle_route_change();
}
} }
select! { select! {
@@ -3631,11 +3792,16 @@ impl PeerRoute {
// make sure my_peer_id is in the peer_infos. // make sure my_peer_id is in the peer_infos.
self.service_impl.update_my_infos().await; self.service_impl.update_my_infos().await;
self.public_ipv6_service.handle_route_change();
peer_rpc.rpc_server().registry().register( peer_rpc.rpc_server().registry().register(
OspfRouteRpcServer::new(self.session_mgr.clone()), OspfRouteRpcServer::new(self.session_mgr.clone()),
&self.global_ctx.get_network_name(), &self.global_ctx.get_network_name(),
); );
peer_rpc.rpc_server().registry().register(
PublicIpv6AddrRpcServer::new(self.public_ipv6_service.rpc_server()),
&self.global_ctx.get_network_name(),
);
self.tasks self.tasks
.lock() .lock()
@@ -3657,6 +3823,16 @@ impl PeerRoute {
.lock() .lock()
.unwrap() .unwrap()
.spawn(Self::clear_expired_peer(self.service_impl.clone())); .spawn(Self::clear_expired_peer(self.service_impl.clone()));
self.tasks
.lock()
.unwrap()
.spawn(self.public_ipv6_service.clone().provider_gc_routine());
self.tasks
.lock()
.unwrap()
.spawn(self.public_ipv6_service.clone().client_routine());
} }
} }
@@ -3677,6 +3853,10 @@ impl Drop for PeerRoute {
OspfRouteRpcServer::new(self.session_mgr.clone()), OspfRouteRpcServer::new(self.session_mgr.clone()),
&self.global_ctx.get_network_name(), &self.global_ctx.get_network_name(),
); );
peer_rpc.rpc_server().registry().unregister(
PublicIpv6AddrRpcServer::new(self.public_ipv6_service.rpc_server()),
&self.global_ctx.get_network_name(),
);
} }
} }
@@ -3765,6 +3945,14 @@ impl Route for PeerRoute {
.collect() .collect()
} }
async fn list_public_ipv6_routes(&self) -> BTreeSet<Ipv6Inet> {
self.public_ipv6_service.list_routes()
}
async fn get_my_public_ipv6_addr(&self) -> Option<Ipv6Inet> {
self.public_ipv6_service.my_addr()
}
async fn get_peer_id_by_ipv4(&self, ipv4_addr: &Ipv4Addr) -> Option<PeerId> { async fn get_peer_id_by_ipv4(&self, ipv4_addr: &Ipv4Addr) -> Option<PeerId> {
let route_table = &self.service_impl.route_table; let route_table = &self.service_impl.route_table;
if let Some(p) = route_table.ipv4_peer_id_map.get(ipv4_addr) { if let Some(p) = route_table.ipv4_peer_id_map.get(ipv4_addr) {
@@ -5180,6 +5368,7 @@ mod tests {
service_impl.my_peer_id, service_impl.my_peer_id,
service_impl.my_peer_route_id, service_impl.my_peer_route_id,
&service_impl.global_ctx, &service_impl.global_ctx,
None,
); );
let mut self_info = self_info; let mut self_info = self_info;
self_info.version = 1; self_info.version = 1;
+928
View File
@@ -0,0 +1,928 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
net::Ipv6Addr,
sync::{Arc, Weak},
time::{Duration, SystemTime},
};
use cidr::{Ipv6Cidr, Ipv6Inet};
use crate::{
common::{
PeerId,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
},
proto::{
peer_rpc::{
AcquireIpv6PublicAddrLeaseRequest, GetIpv6PublicAddrLeaseRequest,
Ipv6PublicAddrLeaseReply, PublicIpv6AddrRpc, PublicIpv6AddrRpcClientFactory,
ReleaseIpv6PublicAddrLeaseRequest, RenewIpv6PublicAddrLeaseRequest,
},
rpc_types::{
self,
controller::{BaseController, Controller},
},
},
};
use super::peer_rpc::PeerRpcManager;
// Use a longer lease with an early renew window to reduce steady-state RPC
// churn while preserving enough margin for transient provider failures.
static PUBLIC_IPV6_LEASE_TTL: Duration = Duration::from_secs(120);
static PUBLIC_IPV6_RENEW_INTERVAL: Duration = Duration::from_secs(40);
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PublicIpv6Provider {
pub peer_id: PeerId,
pub inst_id: uuid::Uuid,
pub prefix: Ipv6Cidr,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PublicIpv6ProviderLease {
pub peer_id: PeerId,
pub inst_id: uuid::Uuid,
pub addr: Ipv6Inet,
pub valid_until: SystemTime,
pub reused: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct PublicIpv6ProviderState {
provider: PublicIpv6Provider,
leases: BTreeMap<uuid::Uuid, PublicIpv6ProviderLease>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct PublicIpv6ClientState {
provider: PublicIpv6Provider,
lease: PublicIpv6ProviderLease,
last_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PublicIpv6PeerRouteInfo {
pub peer_id: PeerId,
pub inst_id: Option<uuid::Uuid>,
pub is_provider: bool,
pub prefix: Option<Ipv6Cidr>,
pub lease: Option<Ipv6Inet>,
pub reachable: bool,
}
pub(crate) trait PublicIpv6RouteControl: Send + Sync {
fn my_peer_id(&self) -> PeerId;
fn peer_route_snapshot(&self) -> Vec<PublicIpv6PeerRouteInfo>;
fn publish_self_public_ipv6_lease(&self, lease: Option<Ipv6Inet>) -> bool;
}
pub(crate) trait PublicIpv6SyncTrigger: Send + Sync {
fn sync_now(&self, reason: &str);
}
pub(crate) struct PublicIpv6Service {
global_ctx: ArcGlobalCtx,
peer_rpc: Weak<PeerRpcManager>,
route_control: Arc<dyn PublicIpv6RouteControl>,
sync_trigger: Arc<dyn PublicIpv6SyncTrigger>,
provider_state: std::sync::Mutex<Option<PublicIpv6ProviderState>>,
client_state: std::sync::Mutex<Option<PublicIpv6ClientState>>,
route_cache: std::sync::Mutex<BTreeSet<Ipv6Inet>>,
my_addr_cache: std::sync::Mutex<Option<Ipv6Inet>>,
}
impl PublicIpv6Service {
pub(crate) fn new(
global_ctx: ArcGlobalCtx,
peer_rpc: Weak<PeerRpcManager>,
route_control: Arc<dyn PublicIpv6RouteControl>,
sync_trigger: Arc<dyn PublicIpv6SyncTrigger>,
) -> Self {
Self {
global_ctx,
peer_rpc,
route_control,
sync_trigger,
provider_state: std::sync::Mutex::new(None),
client_state: std::sync::Mutex::new(None),
route_cache: std::sync::Mutex::new(BTreeSet::new()),
my_addr_cache: std::sync::Mutex::new(None),
}
}
pub(crate) fn rpc_server(self: &Arc<Self>) -> PublicIpv6AddrRpcServerImpl {
PublicIpv6AddrRpcServerImpl {
service: Arc::downgrade(self),
}
}
fn my_peer_id(&self) -> PeerId {
self.route_control.my_peer_id()
}
fn selected_provider(&self) -> Option<PublicIpv6Provider> {
Self::selected_provider_from_snapshot(&self.route_control.peer_route_snapshot())
}
fn current_provider_state(&self) -> Option<PublicIpv6ProviderState> {
self.provider_state.lock().unwrap().clone()
}
fn current_client_state(&self) -> Option<PublicIpv6ClientState> {
self.client_state.lock().unwrap().clone()
}
fn set_provider_state(&self, next: Option<PublicIpv6ProviderState>) -> bool {
let mut guard = self.provider_state.lock().unwrap();
if *guard == next {
return false;
}
*guard = next;
true
}
fn set_client_state(&self, next: Option<PublicIpv6ClientState>) -> bool {
let mut guard = self.client_state.lock().unwrap();
if *guard == next {
return false;
}
*guard = next;
true
}
fn selected_provider_from_snapshot(
peers: &[PublicIpv6PeerRouteInfo],
) -> Option<PublicIpv6Provider> {
peers
.iter()
.filter(|info| info.is_provider)
.filter(|info| info.reachable)
.filter_map(|info| {
Some(PublicIpv6Provider {
peer_id: info.peer_id,
inst_id: info.inst_id?,
prefix: info.prefix?,
})
})
.min_by_key(|provider| provider.inst_id)
}
fn clear_provider_state_if_provider_changed(
&self,
provider: Option<&PublicIpv6Provider>,
) -> bool {
let current = self.current_provider_state();
let should_clear = current
.as_ref()
.is_some_and(|state| provider != Some(&state.provider));
should_clear && self.set_provider_state(None)
}
fn clear_client_state_if_provider_changed(
&self,
provider: Option<&PublicIpv6Provider>,
) -> bool {
let current = self.current_client_state();
let should_clear = current
.as_ref()
.is_some_and(|state| provider != Some(&state.provider));
should_clear && self.set_client_state(None)
}
fn collect_runtime_from_snapshot(
&self,
peers: &[PublicIpv6PeerRouteInfo],
) -> (Option<Ipv6Inet>, BTreeSet<Ipv6Inet>) {
let mut my_addr = self.current_client_state().map(|state| state.lease.addr);
let mut routes = BTreeSet::new();
for info in peers {
let Some(lease) = info.lease else {
continue;
};
if info.peer_id == self.my_peer_id() {
my_addr = Some(lease);
continue;
}
if info.reachable {
routes.insert(lease);
}
}
(my_addr, routes)
}
fn reconcile_runtime_from_snapshot(&self, peers: &[PublicIpv6PeerRouteInfo]) {
let (mut my_addr, routes) = self.collect_runtime_from_snapshot(peers);
if !self.global_ctx.config.get_ipv6_public_addr_auto() {
my_addr = None;
}
let mut cached_my_addr = self.my_addr_cache.lock().unwrap();
if *cached_my_addr != my_addr {
let old = *cached_my_addr;
*cached_my_addr = my_addr;
if self.global_ctx.config.get_ipv6_public_addr_auto() {
self.global_ctx.set_ipv6(my_addr);
}
self.global_ctx
.issue_event(GlobalCtxEvent::PublicIpv6Changed(old, my_addr));
}
drop(cached_my_addr);
let mut cached_routes = self.route_cache.lock().unwrap();
if *cached_routes != routes {
let added = routes
.difference(&cached_routes)
.copied()
.collect::<Vec<_>>();
let removed = cached_routes
.difference(&routes)
.copied()
.collect::<Vec<_>>();
*cached_routes = routes;
self.global_ctx
.issue_event(GlobalCtxEvent::PublicIpv6RoutesUpdated(added, removed));
}
}
fn reconcile_runtime(&self) {
let peers = self.route_control.peer_route_snapshot();
self.reconcile_runtime_from_snapshot(&peers);
}
pub(crate) fn handle_route_change(&self) -> bool {
let peers = self.route_control.peer_route_snapshot();
let provider = Self::selected_provider_from_snapshot(&peers);
let _provider_changed = self.clear_provider_state_if_provider_changed(provider.as_ref());
let client_changed = self.clear_client_state_if_provider_changed(provider.as_ref());
let peer_info_changed = if client_changed {
self.publish_current_client_lease()
} else {
false
};
// When client state changed, publish_current_client_lease() mutated the
// local peer info synchronously, so the pre-update snapshot is stale for
// this node's own entry. Re-fetch to avoid reconciling against old data.
if client_changed {
self.reconcile_runtime();
} else {
self.reconcile_runtime_from_snapshot(&peers);
}
peer_info_changed
}
fn publish_current_client_lease(&self) -> bool {
self.route_control.publish_self_public_ipv6_lease(
self.current_client_state()
.as_ref()
.map(|state| state.lease.addr),
)
}
fn clear_client_lease_state(&self, mut state_changed: bool) -> bool {
state_changed |= self.set_client_state(None);
let peer_info_changed = if state_changed {
self.route_control.publish_self_public_ipv6_lease(None)
} else {
false
};
if state_changed {
// publish_self_public_ipv6_lease mutated the local peer info above,
// so the snapshot passed in is stale for this node.
self.reconcile_runtime();
}
peer_info_changed
}
fn build_lease_reply(
provider: &PublicIpv6Provider,
lease: Option<&PublicIpv6ProviderLease>,
error_msg: Option<String>,
) -> Ipv6PublicAddrLeaseReply {
Ipv6PublicAddrLeaseReply {
provider_peer_id: provider.peer_id,
provider_inst_id: Some(provider.inst_id.into()),
provider_prefix: Some(
Ipv6Inet::new(
provider.prefix.first_address(),
provider.prefix.network_length(),
)
.unwrap()
.into(),
),
leased_addr: lease.map(|lease| lease.addr.into()),
valid_until: lease.map(|lease| lease.valid_until.into()),
reused: lease.map(|lease| lease.reused).unwrap_or(false),
error_msg,
}
}
async fn collect_reserved_addrs(&self, prefix: Ipv6Cidr) -> HashSet<Ipv6Addr> {
let mut reserved = HashSet::new();
let ip_list = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
reserved.extend(
ip_list
.interface_ipv6s
.into_iter()
.map(Ipv6Addr::from)
.filter(|addr| prefix.contains(addr)),
);
reserved.extend(
ip_list
.public_ipv6
.into_iter()
.map(Ipv6Addr::from)
.filter(|addr| prefix.contains(addr)),
);
reserved
}
fn prune_expired_leases(
provider: &PublicIpv6Provider,
current: Option<PublicIpv6ProviderState>,
) -> PublicIpv6ProviderState {
let mut state = current.unwrap_or_else(|| PublicIpv6ProviderState {
provider: provider.clone(),
leases: BTreeMap::new(),
});
state.provider = provider.clone();
let now = SystemTime::now();
state.leases.retain(|_, lease| lease.valid_until > now);
state
}
async fn acquire_lease(
&self,
requester_peer_id: PeerId,
requester_inst_id: uuid::Uuid,
renew_only: bool,
requested_addr: Option<Ipv6Inet>,
) -> Result<PublicIpv6ProviderLease, String> {
let provider = self
.selected_provider()
.ok_or_else(|| "no active ipv6 public address provider".to_string())?;
if provider.peer_id != self.my_peer_id() {
return Err("this peer is not the selected ipv6 public address provider".to_string());
}
let mut state = Self::prune_expired_leases(&provider, self.current_provider_state());
if let Some(existing) = state.leases.get_mut(&requester_inst_id) {
if requested_addr.is_some() && requested_addr != Some(existing.addr) {
return Err("requested lease does not match the active allocation".to_string());
}
existing.peer_id = requester_peer_id;
existing.valid_until = SystemTime::now() + PUBLIC_IPV6_LEASE_TTL;
existing.reused = true;
let lease = existing.clone();
self.set_provider_state(Some(state));
return Ok(lease);
}
if renew_only {
return Err("lease not found".to_string());
}
let mut reserved = self.collect_reserved_addrs(provider.prefix).await;
let old_map = state
.leases
.iter()
.map(|(inst_id, lease)| {
reserved.insert(lease.addr.address());
(*inst_id, lease.addr)
})
.collect::<HashMap<_, _>>();
let mut allocated =
allocate_public_ipv6_leases(provider.prefix, &[requester_inst_id], &reserved, &old_map);
let Some(mut lease) = allocated.pop() else {
return Err(format!(
"no free ipv6 address left in provider prefix {}",
provider.prefix
));
};
lease.peer_id = requester_peer_id;
lease.valid_until = SystemTime::now() + PUBLIC_IPV6_LEASE_TTL;
state.leases.insert(requester_inst_id, lease.clone());
self.set_provider_state(Some(state));
Ok(lease)
}
fn release_lease(&self, requester_peer_id: PeerId, requester_inst_id: uuid::Uuid) -> bool {
let Some(provider) = self.selected_provider() else {
return false;
};
if provider.peer_id != self.my_peer_id() {
return false;
}
let mut state = Self::prune_expired_leases(&provider, self.current_provider_state());
let removed = state
.leases
.get(&requester_inst_id)
.map(|lease| lease.peer_id == requester_peer_id)
.unwrap_or(false);
if !removed {
return false;
}
state.leases.remove(&requester_inst_id);
self.set_provider_state(Some(state))
}
fn get_lease(
&self,
requester_peer_id: PeerId,
requester_inst_id: uuid::Uuid,
requested_addr: Option<Ipv6Inet>,
) -> Result<(PublicIpv6Provider, PublicIpv6ProviderLease), String> {
let provider = self
.selected_provider()
.ok_or_else(|| "no active ipv6 public address provider".to_string())?;
if provider.peer_id != self.my_peer_id() {
return Err("this peer is not the selected ipv6 public address provider".to_string());
}
let state = Self::prune_expired_leases(&provider, self.current_provider_state());
let Some(lease) = state.leases.get(&requester_inst_id) else {
return Err("lease not found".to_string());
};
if lease.peer_id != requester_peer_id {
return Err("lease owner mismatch".to_string());
}
if requested_addr.is_some() && requested_addr != Some(lease.addr) {
return Err("requested lease does not match the active allocation".to_string());
}
Ok((provider, lease.clone()))
}
pub(crate) async fn gc_provider_leases(&self) {
let peers = self.route_control.peer_route_snapshot();
let provider = Self::selected_provider_from_snapshot(&peers);
self.clear_provider_state_if_provider_changed(provider.as_ref());
let Some(provider) = provider else {
return;
};
if provider.peer_id != self.my_peer_id() {
return;
}
let state = Self::prune_expired_leases(&provider, self.current_provider_state());
self.set_provider_state(Some(state));
}
pub(crate) async fn sync_client_state(&self) -> bool {
if !self.global_ctx.config.get_ipv6_public_addr_auto() {
return self
.clear_client_lease_state(self.clear_client_state_if_provider_changed(None));
}
let peers = self.route_control.peer_route_snapshot();
let provider = Self::selected_provider_from_snapshot(&peers);
self.clear_provider_state_if_provider_changed(provider.as_ref());
let state_changed = self.clear_client_state_if_provider_changed(provider.as_ref());
let Some(provider) = provider else {
return self.clear_client_lease_state(state_changed);
};
if provider.peer_id == self.my_peer_id() {
return self.clear_client_lease_state(state_changed);
}
let current = self.current_client_state();
let need_rpc = current.as_ref().is_none_or(|state| {
state.provider != provider
|| state.lease.valid_until <= SystemTime::now() + PUBLIC_IPV6_RENEW_INTERVAL
});
if !need_rpc {
if state_changed {
self.reconcile_runtime();
}
return false;
}
let Some(peer_rpc) = self.peer_rpc.upgrade() else {
if state_changed {
self.reconcile_runtime();
}
return false;
};
let mut ctrl = BaseController::default();
ctrl.set_timeout_ms(3000);
let rpc_stub = peer_rpc
.rpc_client()
.scoped_client::<PublicIpv6AddrRpcClientFactory<BaseController>>(
self.my_peer_id(),
provider.peer_id,
self.global_ctx.get_network_name(),
);
let inst_id = self.global_ctx.get_id();
let reply = if let Some(state) = current.as_ref().filter(|state| state.provider == provider)
{
match rpc_stub
.renew_lease(
ctrl.clone(),
RenewIpv6PublicAddrLeaseRequest {
peer_id: self.my_peer_id(),
inst_id: Some(inst_id.into()),
leased_addr: Some(state.lease.addr.into()),
},
)
.await
{
Ok(reply) if reply.error_msg.is_none() => Ok(reply),
Ok(_) | Err(_) => {
rpc_stub
.acquire_lease(
ctrl.clone(),
AcquireIpv6PublicAddrLeaseRequest {
peer_id: self.my_peer_id(),
inst_id: Some(inst_id.into()),
},
)
.await
}
}
} else {
rpc_stub
.acquire_lease(
ctrl,
AcquireIpv6PublicAddrLeaseRequest {
peer_id: self.my_peer_id(),
inst_id: Some(inst_id.into()),
},
)
.await
};
let mut state_changed = state_changed;
match reply {
Ok(reply) if reply.error_msg.is_none() => {
let Some(leased_addr) = reply.leased_addr.map(Into::into) else {
return false;
};
let valid_until = reply
.valid_until
.and_then(|ts| SystemTime::try_from(ts).ok())
.unwrap_or_else(|| SystemTime::now() + PUBLIC_IPV6_LEASE_TTL);
let next_state = PublicIpv6ClientState {
provider: provider.clone(),
lease: PublicIpv6ProviderLease {
peer_id: self.my_peer_id(),
inst_id,
addr: leased_addr,
valid_until,
reused: reply.reused,
},
last_error: None,
};
state_changed |= self.set_client_state(Some(next_state));
}
Ok(_) | Err(_) => {
let should_clear = current
.as_ref()
.map(|state| state.lease.valid_until <= SystemTime::now())
.unwrap_or(true);
if should_clear {
state_changed |= self.set_client_state(None);
}
}
}
let peer_info_changed = if state_changed {
self.publish_current_client_lease()
} else {
false
};
if state_changed {
self.reconcile_runtime();
}
peer_info_changed
}
pub(crate) async fn provider_gc_routine(self: Arc<Self>) {
if !self.global_ctx.config.get_ipv6_public_addr_provider() {
return;
}
loop {
tokio::time::sleep(Duration::from_secs(15)).await;
self.gc_provider_leases().await;
}
}
pub(crate) async fn client_routine(self: Arc<Self>) {
loop {
if self.sync_client_state().await {
self.sync_trigger.sync_now("sync_public_ipv6_client_state");
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
pub(crate) fn list_routes(&self) -> BTreeSet<Ipv6Inet> {
self.route_cache.lock().unwrap().clone()
}
pub(crate) fn my_addr(&self) -> Option<Ipv6Inet> {
*self.my_addr_cache.lock().unwrap()
}
}
#[derive(Clone)]
pub(crate) struct PublicIpv6AddrRpcServerImpl {
service: Weak<PublicIpv6Service>,
}
impl PublicIpv6AddrRpcServerImpl {
fn selected_provider(
service: &PublicIpv6Service,
) -> rpc_types::error::Result<PublicIpv6Provider> {
service
.selected_provider()
.ok_or_else(|| anyhow::anyhow!("provider not available").into())
}
fn build_error_reply(
service: &PublicIpv6Service,
error_msg: String,
) -> rpc_types::error::Result<Ipv6PublicAddrLeaseReply> {
Ok(PublicIpv6Service::build_lease_reply(
&Self::selected_provider(service)?,
None,
Some(error_msg),
))
}
}
#[async_trait::async_trait]
impl PublicIpv6AddrRpc for PublicIpv6AddrRpcServerImpl {
type Controller = BaseController;
async fn acquire_lease(
&self,
_: BaseController,
request: AcquireIpv6PublicAddrLeaseRequest,
) -> rpc_types::error::Result<Ipv6PublicAddrLeaseReply> {
let Some(service) = self.service.upgrade() else {
return Err(anyhow::anyhow!("public ipv6 service stopped").into());
};
let inst_id: uuid::Uuid = request
.inst_id
.ok_or_else(|| anyhow::anyhow!("inst_id is required"))?
.into();
match service
.acquire_lease(request.peer_id, inst_id, false, None)
.await
{
Ok(lease) => Ok(PublicIpv6Service::build_lease_reply(
&Self::selected_provider(&service)?,
Some(&lease),
None,
)),
Err(error_msg) => Self::build_error_reply(&service, error_msg),
}
}
async fn renew_lease(
&self,
_: BaseController,
request: RenewIpv6PublicAddrLeaseRequest,
) -> rpc_types::error::Result<Ipv6PublicAddrLeaseReply> {
let Some(service) = self.service.upgrade() else {
return Err(anyhow::anyhow!("public ipv6 service stopped").into());
};
let inst_id: uuid::Uuid = request
.inst_id
.ok_or_else(|| anyhow::anyhow!("inst_id is required"))?
.into();
let requested_addr = request.leased_addr.map(Into::into);
match service
.acquire_lease(request.peer_id, inst_id, true, requested_addr)
.await
{
Ok(lease) => Ok(PublicIpv6Service::build_lease_reply(
&Self::selected_provider(&service)?,
Some(&lease),
None,
)),
Err(error_msg) => Self::build_error_reply(&service, error_msg),
}
}
async fn release_lease(
&self,
_: BaseController,
request: ReleaseIpv6PublicAddrLeaseRequest,
) -> rpc_types::error::Result<crate::proto::common::Void> {
let Some(service) = self.service.upgrade() else {
return Err(anyhow::anyhow!("public ipv6 service stopped").into());
};
let inst_id: uuid::Uuid = request
.inst_id
.ok_or_else(|| anyhow::anyhow!("inst_id is required"))?
.into();
service.release_lease(request.peer_id, inst_id);
Ok(Default::default())
}
async fn get_lease(
&self,
_: BaseController,
request: GetIpv6PublicAddrLeaseRequest,
) -> rpc_types::error::Result<Ipv6PublicAddrLeaseReply> {
let Some(service) = self.service.upgrade() else {
return Err(anyhow::anyhow!("public ipv6 service stopped").into());
};
let inst_id: uuid::Uuid = request
.inst_id
.ok_or_else(|| anyhow::anyhow!("inst_id is required"))?
.into();
match service.get_lease(request.peer_id, inst_id, None) {
Ok((provider, lease)) => Ok(PublicIpv6Service::build_lease_reply(
&provider,
Some(&lease),
None,
)),
Err(error_msg) => Self::build_error_reply(&service, error_msg),
}
}
}
fn allocate_public_ipv6_leases(
prefix: Ipv6Cidr,
auto_peer_ids: &[uuid::Uuid],
reserved: &HashSet<Ipv6Addr>,
old_map: &HashMap<uuid::Uuid, Ipv6Inet>,
) -> Vec<PublicIpv6ProviderLease> {
let prefix_len = prefix.network_length();
let host_bits = 128_u32.saturating_sub(prefix_len as u32);
let max_offsets = if host_bits == 128 {
None
} else {
Some(1_u128 << host_bits)
};
let network = u128::from(prefix.first_address());
let mut used_offsets = reserved
.iter()
.filter(|addr| prefix.contains(addr))
.map(|addr| u128::from(*addr).saturating_sub(network))
.collect::<HashSet<_>>();
let mut leases = Vec::with_capacity(auto_peer_ids.len());
for inst_id in auto_peer_ids.iter().copied() {
let addr = if let Some(existing) = old_map.get(&inst_id).copied()
&& prefix.contains(&existing.address())
&& used_offsets.insert(u128::from(existing.address()).saturating_sub(network))
{
existing
} else {
let Some(max_offsets) = max_offsets else {
continue;
};
let usable_slots = max_offsets.saturating_sub(1);
let offset = if usable_slots == 0 {
used_offsets.insert(0).then_some(0)
} else {
let start_offset = (inst_id.as_u128() % usable_slots) + 1;
(0..usable_slots)
.map(|step| ((start_offset - 1 + step) % usable_slots) + 1)
.find(|offset| used_offsets.insert(*offset))
};
let Some(offset) = offset else {
break;
};
Ipv6Inet::new(Ipv6Addr::from(network + offset), 128).unwrap()
};
leases.push(PublicIpv6ProviderLease {
peer_id: 0,
inst_id,
addr,
valid_until: SystemTime::UNIX_EPOCH,
reused: old_map
.get(&inst_id)
.map(|old| *old == addr)
.unwrap_or(false),
});
}
leases
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::Ipv6Addr;
use cidr::Ipv6Cidr;
use super::{PublicIpv6PeerRouteInfo, PublicIpv6Service, allocate_public_ipv6_leases};
#[test]
fn public_ipv6_lease_allocator_keeps_stable_addresses() {
let prefix = "2001:db8::/124".parse::<Ipv6Cidr>().unwrap();
let first = uuid::Uuid::from_u128(1);
let second = uuid::Uuid::from_u128(2);
let leases =
allocate_public_ipv6_leases(prefix, &[first, second], &HashSet::new(), &HashMap::new());
assert_eq!(leases.len(), 2);
assert_ne!(leases[0].addr, leases[1].addr);
let initial_map = HashMap::from([(first, leases[0].addr)]);
let next = allocate_public_ipv6_leases(prefix, &[first], &HashSet::new(), &initial_map);
assert_eq!(next.len(), 1);
assert_eq!(next[0].addr, leases[0].addr);
assert!(next[0].reused);
}
#[test]
fn public_ipv6_provider_prefers_smallest_instance_id() {
let info_a = PublicIpv6PeerRouteInfo {
peer_id: 2,
inst_id: Some(uuid::Uuid::from_u128(2)),
is_provider: true,
prefix: Some("2001:db8:1::/120".parse().unwrap()),
lease: None,
reachable: true,
};
let info_b = PublicIpv6PeerRouteInfo {
peer_id: 1,
inst_id: Some(uuid::Uuid::from_u128(1)),
is_provider: true,
prefix: Some("2001:db8:2::/120".parse().unwrap()),
lease: None,
reachable: true,
};
let selected =
PublicIpv6Service::selected_provider_from_snapshot(&[info_a, info_b]).unwrap();
assert_eq!(selected.peer_id, 1);
}
#[test]
fn public_ipv6_provider_prefers_reachable_provider() {
let unreachable_lower_id = PublicIpv6PeerRouteInfo {
peer_id: 1,
inst_id: Some(uuid::Uuid::from_u128(1)),
is_provider: true,
prefix: Some("2001:db8:1::/120".parse().unwrap()),
lease: None,
reachable: false,
};
let reachable_higher_id = PublicIpv6PeerRouteInfo {
peer_id: 2,
inst_id: Some(uuid::Uuid::from_u128(2)),
is_provider: true,
prefix: Some("2001:db8:2::/120".parse().unwrap()),
lease: None,
reachable: true,
};
let selected = PublicIpv6Service::selected_provider_from_snapshot(&[
unreachable_lower_id,
reachable_higher_id,
])
.unwrap();
assert_eq!(selected.peer_id, 2);
}
#[test]
fn public_ipv6_lease_allocator_stops_when_only_network_offset_is_left() {
let prefix = "2001:db8::/126".parse::<Ipv6Cidr>().unwrap();
let network = prefix.first_address();
let reserved = HashSet::from([
Ipv6Addr::from(u128::from(network) + 1),
Ipv6Addr::from(u128::from(network) + 2),
Ipv6Addr::from(u128::from(network) + 3),
]);
let leases = allocate_public_ipv6_leases(
prefix,
&[uuid::Uuid::from_u128(42)],
&reserved,
&HashMap::new(),
);
assert!(leases.is_empty());
}
}
+17
View File
@@ -1,3 +1,4 @@
use cidr::Ipv6Inet;
use cidr::{Ipv4Cidr, Ipv6Cidr}; use cidr::{Ipv4Cidr, Ipv6Cidr};
use dashmap::DashMap; use dashmap::DashMap;
use std::{ use std::{
@@ -93,6 +94,14 @@ pub trait Route {
// TODO: rewrite route management, remove this // TODO: rewrite route management, remove this
async fn list_proxy_cidrs_v6(&self) -> BTreeSet<Ipv6Cidr>; async fn list_proxy_cidrs_v6(&self) -> BTreeSet<Ipv6Cidr>;
async fn list_public_ipv6_routes(&self) -> BTreeSet<Ipv6Inet> {
BTreeSet::new()
}
async fn get_my_public_ipv6_addr(&self) -> Option<Ipv6Inet> {
None
}
async fn get_peer_id_by_ipv4(&self, _ipv4: &Ipv4Addr) -> Option<PeerId> { async fn get_peer_id_by_ipv4(&self, _ipv4: &Ipv4Addr) -> Option<PeerId> {
None None
} }
@@ -194,6 +203,14 @@ impl Route for MockRoute {
unimplemented!() unimplemented!()
} }
async fn list_public_ipv6_routes(&self) -> BTreeSet<Ipv6Inet> {
unimplemented!()
}
async fn get_my_public_ipv6_addr(&self) -> Option<Ipv6Inet> {
panic!("mock route")
}
async fn get_peer_info(&self, _peer_id: PeerId) -> Option<RoutePeerInfo> { async fn get_peer_info(&self, _peer_id: PeerId) -> Option<RoutePeerInfo> {
panic!("mock route") panic!("mock route")
} }
+2
View File
@@ -81,6 +81,7 @@ message Route {
optional int32 path_latency_latency_first = 14; optional int32 path_latency_latency_first = 14;
common.Ipv6Inet ipv6_addr = 15; common.Ipv6Inet ipv6_addr = 15;
common.Ipv6Inet public_ipv6_addr = 16;
} }
message PeerRoutePair { message PeerRoutePair {
@@ -100,6 +101,7 @@ message NodeInfo {
string version = 9; string version = 9;
common.PeerFeatureFlag feature_flag = 10; common.PeerFeatureFlag feature_flag = 10;
peer_rpc.GetIpListResponse ip_list = 11; peer_rpc.GetIpListResponse ip_list = 11;
common.Ipv6Inet public_ipv6_addr = 12;
} }
message ShowNodeInfoRequest { InstanceIdentifier instance = 1; } message ShowNodeInfoRequest { InstanceIdentifier instance = 1; }
+3
View File
@@ -96,6 +96,9 @@ message NetworkConfig {
optional bool need_p2p = 59; optional bool need_p2p = 59;
optional uint64 instance_recv_bps_limit = 60; optional uint64 instance_recv_bps_limit = 60;
optional bool disable_upnp = 61; optional bool disable_upnp = 61;
optional bool ipv6_public_addr_provider = 62;
optional bool ipv6_public_addr_auto = 63;
optional string ipv6_public_addr_prefix = 64;
} }
message PortForwardConfig { message PortForwardConfig {
+1
View File
@@ -225,6 +225,7 @@ message PeerFeatureFlag {
bool is_credential_peer = 8; bool is_credential_peer = 8;
bool need_p2p = 9; bool need_p2p = 9;
bool disable_p2p = 10; bool disable_p2p = 10;
bool ipv6_public_addr_provider = 11;
} }
enum SocketType { enum SocketType {
+43
View File
@@ -47,6 +47,9 @@ message RoutePeerInfo {
// Trusted credential public keys published by admin nodes (holding network_secret) // Trusted credential public keys published by admin nodes (holding network_secret)
repeated TrustedCredentialPubkeyProof trusted_credential_pubkeys = 19; repeated TrustedCredentialPubkeyProof trusted_credential_pubkeys = 19;
optional common.Ipv6Inet ipv6_public_addr_prefix = 22;
optional common.Ipv6Inet ipv6_public_addr_lease = 24;
} }
message PeerIdVersion { message PeerIdVersion {
@@ -133,6 +136,46 @@ service OspfRouteRpc {
rpc SyncRouteInfo(SyncRouteInfoRequest) returns (SyncRouteInfoResponse); rpc SyncRouteInfo(SyncRouteInfoRequest) returns (SyncRouteInfoResponse);
} }
message AcquireIpv6PublicAddrLeaseRequest {
uint32 peer_id = 1;
common.UUID inst_id = 2;
}
message RenewIpv6PublicAddrLeaseRequest {
uint32 peer_id = 1;
common.UUID inst_id = 2;
common.Ipv6Inet leased_addr = 3;
}
message ReleaseIpv6PublicAddrLeaseRequest {
uint32 peer_id = 1;
common.UUID inst_id = 2;
}
message GetIpv6PublicAddrLeaseRequest {
uint32 peer_id = 1;
common.UUID inst_id = 2;
}
message Ipv6PublicAddrLeaseReply {
uint32 provider_peer_id = 1;
common.UUID provider_inst_id = 2;
common.Ipv6Inet provider_prefix = 3;
common.Ipv6Inet leased_addr = 4;
google.protobuf.Timestamp valid_until = 5;
bool reused = 6;
optional string error_msg = 7;
}
service PublicIpv6AddrRpc {
rpc AcquireLease(AcquireIpv6PublicAddrLeaseRequest)
returns (Ipv6PublicAddrLeaseReply);
rpc RenewLease(RenewIpv6PublicAddrLeaseRequest)
returns (Ipv6PublicAddrLeaseReply);
rpc ReleaseLease(ReleaseIpv6PublicAddrLeaseRequest) returns (common.Void);
rpc GetLease(GetIpv6PublicAddrLeaseRequest) returns (Ipv6PublicAddrLeaseReply);
}
message GetIpListRequest {} message GetIpListRequest {}
message GetIpListResponse { message GetIpListResponse {
+1 -1
View File
@@ -38,7 +38,7 @@ async fn test_route_peer_info_ipv6() {
global_ctx.set_ipv6(Some(ipv6_cidr)); global_ctx.set_ipv6(Some(ipv6_cidr));
// Create RoutePeerInfo with IPv6 support // Create RoutePeerInfo with IPv6 support
let updated_info = RoutePeerInfo::new_updated_self(123, 456, &global_ctx); let updated_info = RoutePeerInfo::new_updated_self(123, 456, &global_ctx, None);
// Verify IPv6 address is included // Verify IPv6 address is included
assert!(updated_info.ipv6_addr.is_some()); assert!(updated_info.ipv6_addr.is_some());
+473
View File
@@ -402,6 +402,479 @@ async fn ping6_test(from_netns: &str, target_ip: &str, payload_size: Option<usiz
code.code().unwrap() == 0 code.code().unwrap() == 0
} }
fn run_cmd(program: &str, args: &[&str]) {
let output = std::process::Command::new(program)
.args(args)
.output()
.unwrap();
assert!(
output.status.success(),
"{} {:?} failed: stdout={}, stderr={}",
program,
args,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
}
fn run_cmd_output(program: &str, args: &[&str]) -> String {
let output = std::process::Command::new(program)
.args(args)
.output()
.unwrap();
assert!(
output.status.success(),
"{} {:?} failed: stdout={}, stderr={}",
program,
args,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
String::from_utf8(output.stdout).unwrap()
}
fn run_ip(args: &[&str]) {
run_cmd("ip", args);
}
fn run_ip_in_ns(ns: &str, args: &[&str]) {
let mut cmd = vec!["netns", "exec", ns, "ip"];
cmd.extend_from_slice(args);
run_cmd("ip", &cmd);
}
fn run_ip_in_ns_output(ns: &str, args: &[&str]) -> String {
let mut cmd = vec!["netns", "exec", ns, "ip"];
cmd.extend_from_slice(args);
run_cmd_output("ip", &cmd)
}
fn run_sysctl_in_ns(ns: &str, assignment: &str) {
run_cmd("ip", &["netns", "exec", ns, "sysctl", "-qw", assignment]);
}
fn create_empty_netns(name: &str) {
del_netns(name);
run_ip(&["netns", "add", name]);
run_ip(&["netns", "exec", name, "ip", "link", "set", "lo", "up"]);
}
fn connect_ns_to_bridge(ns: &str, guest_if: &str, host_if: &str, bridge: &str) {
let _ = std::process::Command::new("ip")
.args(["link", "del", host_if])
.status();
run_ip(&[
"link", "add", host_if, "type", "veth", "peer", "name", guest_if,
]);
run_ip(&["link", "set", guest_if, "netns", ns]);
run_ip(&["link", "set", host_if, "up"]);
run_cmd("brctl", &["addif", bridge, host_if]);
run_ip(&["netns", "exec", ns, "ip", "link", "set", guest_if, "up"]);
}
struct PublicIpv6Lab {
extra_namespaces: [&'static str; 2],
extra_bridges: [&'static str; 2],
}
impl PublicIpv6Lab {
const PROVIDER_NS: &'static str = "net_a";
const CLIENT_NS: &'static str = "net_b";
const UPSTREAM_NS: &'static str = "net_pubgw";
const SERVER_NS: &'static str = "net_pubsrv";
const WAN_BRIDGE: &'static str = "br_pubwan";
const SERVER_BRIDGE: &'static str = "br_pubsrv";
const PROVIDER_TUN: &'static str = "etpubv6p";
const CLIENT_TUN: &'static str = "etpubv6c";
const PROVIDER_PREFIX: &'static str = "2001:db8:100::/64";
const PROVIDER_DEFAULT_FROM: &'static str = "2001:db8:100::/64";
const PROVIDER_WAN_ADDR: &'static str = "2001:db8:ffff:1::2/64";
const UPSTREAM_WAN_ADDR: &'static str = "2001:db8:ffff:1::1/64";
const UPSTREAM_SERVER_ADDR: &'static str = "2001:db8:ffff:2::1/64";
const SERVER_ADDR: &'static str = "2001:db8:ffff:2::100/64";
const SERVER_IP: &'static str = "2001:db8:ffff:2::100";
fn setup() -> Self {
prepare_linux_namespaces();
del_netns(Self::UPSTREAM_NS);
del_netns(Self::SERVER_NS);
let _ = std::process::Command::new("ip")
.args(["link", "del", Self::WAN_BRIDGE])
.status();
let _ = std::process::Command::new("ip")
.args(["link", "del", Self::SERVER_BRIDGE])
.status();
let _ = std::process::Command::new("brctl")
.args(["delbr", Self::WAN_BRIDGE])
.status();
let _ = std::process::Command::new("brctl")
.args(["delbr", Self::SERVER_BRIDGE])
.status();
create_empty_netns(Self::UPSTREAM_NS);
create_empty_netns(Self::SERVER_NS);
prepare_bridge(Self::WAN_BRIDGE);
prepare_bridge(Self::SERVER_BRIDGE);
run_ip(&["link", "set", Self::WAN_BRIDGE, "up"]);
run_ip(&["link", "set", Self::SERVER_BRIDGE, "up"]);
connect_ns_to_bridge(
Self::PROVIDER_NS,
"pubwan0",
"veth_pubwan_p",
Self::WAN_BRIDGE,
);
connect_ns_to_bridge(
Self::UPSTREAM_NS,
"upwan0",
"veth_pubwan_u",
Self::WAN_BRIDGE,
);
connect_ns_to_bridge(
Self::UPSTREAM_NS,
"upsrv0",
"veth_pubsrv_u",
Self::SERVER_BRIDGE,
);
connect_ns_to_bridge(
Self::SERVER_NS,
"srv0",
"veth_pubsrv_s",
Self::SERVER_BRIDGE,
);
run_ip_in_ns(
Self::PROVIDER_NS,
&["addr", "add", Self::PROVIDER_WAN_ADDR, "dev", "pubwan0"],
);
run_ip_in_ns(
Self::UPSTREAM_NS,
&["addr", "add", Self::UPSTREAM_WAN_ADDR, "dev", "upwan0"],
);
run_ip_in_ns(
Self::UPSTREAM_NS,
&["addr", "add", Self::UPSTREAM_SERVER_ADDR, "dev", "upsrv0"],
);
run_ip_in_ns(
Self::SERVER_NS,
&["addr", "add", Self::SERVER_ADDR, "dev", "srv0"],
);
run_ip_in_ns(
Self::PROVIDER_NS,
&["link", "add", "pubprefix0", "type", "dummy"],
);
run_ip_in_ns(Self::PROVIDER_NS, &["link", "set", "pubprefix0", "up"]);
run_ip_in_ns(
Self::PROVIDER_NS,
&[
"-6",
"route",
"add",
Self::PROVIDER_PREFIX,
"dev",
"pubprefix0",
],
);
run_ip_in_ns(
Self::PROVIDER_NS,
&[
"-6",
"route",
"add",
"default",
"from",
Self::PROVIDER_DEFAULT_FROM,
"via",
"2001:db8:ffff:1::1",
"dev",
"pubwan0",
],
);
run_ip_in_ns(
Self::SERVER_NS,
&[
"-6",
"route",
"add",
"default",
"via",
"2001:db8:ffff:2::1",
"dev",
"srv0",
],
);
run_ip_in_ns(
Self::UPSTREAM_NS,
&[
"-6",
"route",
"add",
Self::PROVIDER_PREFIX,
"via",
"2001:db8:ffff:1::2",
"dev",
"upwan0",
],
);
run_sysctl_in_ns(Self::PROVIDER_NS, "net.ipv6.conf.all.forwarding=1");
run_sysctl_in_ns(Self::UPSTREAM_NS, "net.ipv6.conf.all.forwarding=1");
Self {
extra_namespaces: [Self::UPSTREAM_NS, Self::SERVER_NS],
extra_bridges: [Self::WAN_BRIDGE, Self::SERVER_BRIDGE],
}
}
}
impl Drop for PublicIpv6Lab {
fn drop(&mut self) {
for ns in self.extra_namespaces {
del_netns(ns);
}
for bridge in self.extra_bridges {
let _ = std::process::Command::new("ip")
.args(["link", "del", bridge])
.status();
let _ = std::process::Command::new("brctl")
.args(["delbr", bridge])
.status();
}
}
}
fn get_public_ipv6_config(
inst_name: &str,
netns: &str,
ipv4: &str,
dev_name: &str,
inst_id: uuid::Uuid,
) -> TomlConfigLoader {
let config = get_inst_config(inst_name, Some(netns), ipv4, "fd00::1/64");
config.set_id(inst_id);
config.set_ipv6(None);
config.set_socks5_portal(None);
config.set_network_identity(NetworkIdentity {
network_name: "public_ipv6_auto_addr_test".to_string(),
network_secret: Some("public_ipv6_auto_addr_secret".to_string()),
network_secret_digest: None,
});
config.set_listeners(vec!["tcp://0.0.0.0:11010".parse().unwrap()]);
let mut flags = config.get_flags();
flags.dev_name = dev_name.to_string();
config.set_flags(flags);
config
}
async fn init_public_ipv6_two_node(
client_inst_id: uuid::Uuid,
) -> (PublicIpv6Lab, Instance, Instance) {
let lab = PublicIpv6Lab::setup();
let provider_cfg = get_public_ipv6_config(
"provider_public_ipv6",
PublicIpv6Lab::PROVIDER_NS,
"10.144.144.1",
PublicIpv6Lab::PROVIDER_TUN,
uuid::Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
);
provider_cfg.set_ipv6_public_addr_provider(true);
let client_cfg = get_public_ipv6_config(
"client_public_ipv6",
PublicIpv6Lab::CLIENT_NS,
"10.144.144.2",
PublicIpv6Lab::CLIENT_TUN,
client_inst_id,
);
client_cfg.set_ipv6_public_addr_auto(true);
let mut provider = Instance::new(provider_cfg);
let mut client = Instance::new(client_cfg);
provider.run().await.unwrap();
client.run().await.unwrap();
provider
.get_conn_manager()
.add_connector(TcpTunnelConnector::new(
"tcp://10.1.1.2:11010".parse().unwrap(),
));
wait_for_condition(
|| async {
provider.get_peer_manager().list_routes().await.len() == 1
&& client.get_peer_manager().list_routes().await.len() == 1
},
Duration::from_secs(8),
)
.await;
(lab, provider, client)
}
async fn wait_for_public_ipv6_addr(inst: &Instance) -> cidr::Ipv6Inet {
wait_for_condition(
|| async {
inst.get_peer_manager()
.get_my_public_ipv6_addr()
.await
.is_some()
},
Duration::from_secs(10),
)
.await;
inst.get_peer_manager()
.get_my_public_ipv6_addr()
.await
.unwrap()
}
async fn wait_for_public_ipv6_route(inst: &Instance, target: cidr::Ipv6Inet) {
wait_for_condition(
|| async {
inst.get_peer_manager()
.list_public_ipv6_routes()
.await
.contains(&target)
},
Duration::from_secs(10),
)
.await;
}
fn route_exists_in_ns(ns: &str, needle: &str) -> bool {
run_ip_in_ns_output(ns, &["-6", "route", "show"])
.lines()
.any(|line| line.contains(needle))
}
fn addr_exists_in_ns(ns: &str, dev: &str, needle: &str) -> bool {
run_ip_in_ns_output(ns, &["-6", "addr", "show", "dev", dev]).contains(needle)
}
#[tokio::test]
#[serial_test::serial]
pub async fn public_ipv6_auto_addr_end_to_end() {
let client_id = uuid::Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap();
let (_lab, provider, client) = init_public_ipv6_two_node(client_id).await;
wait_for_condition(
|| async {
provider
.get_global_ctx()
.config
.get_ipv6_public_addr_prefix()
== Some(PublicIpv6Lab::PROVIDER_PREFIX.parse().unwrap())
},
Duration::from_secs(10),
)
.await;
let leased = wait_for_public_ipv6_addr(&client).await;
wait_for_public_ipv6_route(&provider, leased).await;
assert_eq!(
provider
.get_global_ctx()
.config
.get_ipv6_public_addr_prefix(),
Some(PublicIpv6Lab::PROVIDER_PREFIX.parse().unwrap())
);
assert!(
leased.address().segments()[0] & 0xfe00 != 0xfc00,
"leased address should not be unique-local: {leased}"
);
wait_for_condition(
|| async {
addr_exists_in_ns(
PublicIpv6Lab::CLIENT_NS,
PublicIpv6Lab::CLIENT_TUN,
&leased.to_string(),
) && route_exists_in_ns(
PublicIpv6Lab::CLIENT_NS,
&format!("default dev {}", PublicIpv6Lab::CLIENT_TUN),
) && route_exists_in_ns(
PublicIpv6Lab::PROVIDER_NS,
&format!("{} dev {}", leased.address(), PublicIpv6Lab::PROVIDER_TUN),
)
},
Duration::from_secs(10),
)
.await;
wait_for_condition(
|| async { ping6_test(PublicIpv6Lab::CLIENT_NS, PublicIpv6Lab::SERVER_IP, None).await },
Duration::from_secs(10),
)
.await;
wait_for_condition(
|| async {
ping6_test(
PublicIpv6Lab::SERVER_NS,
leased.address().to_string().as_str(),
None,
)
.await
},
Duration::from_secs(10),
)
.await;
drop_insts(vec![provider, client]).await;
}
#[tokio::test]
#[serial_test::serial]
pub async fn public_ipv6_auto_addr_reconnect_reuses_same_address() {
let client_id = uuid::Uuid::parse_str("33333333-3333-3333-3333-333333333333").unwrap();
let (_lab, provider, client) = init_public_ipv6_two_node(client_id).await;
let first = wait_for_public_ipv6_addr(&client).await;
drop_insts(vec![client]).await;
let client_cfg = get_public_ipv6_config(
"client_public_ipv6_reconnect",
PublicIpv6Lab::CLIENT_NS,
"10.144.144.2",
PublicIpv6Lab::CLIENT_TUN,
client_id,
);
client_cfg.set_ipv6_public_addr_auto(true);
let mut client = Instance::new(client_cfg);
client.run().await.unwrap();
provider
.get_conn_manager()
.add_connector(TcpTunnelConnector::new(
"tcp://10.1.1.2:11010".parse().unwrap(),
));
wait_for_condition(
|| async {
provider.get_peer_manager().list_routes().await.len() == 1
&& client.get_peer_manager().list_routes().await.len() == 1
},
Duration::from_secs(8),
)
.await;
let second = wait_for_public_ipv6_addr(&client).await;
assert_eq!(first, second);
wait_for_condition(
|| async { ping6_test(PublicIpv6Lab::CLIENT_NS, PublicIpv6Lab::SERVER_IP, None).await },
Duration::from_secs(10),
)
.await;
drop_insts(vec![provider, client]).await;
}
#[rstest::rstest] #[rstest::rstest]
#[tokio::test] #[tokio::test]
#[serial_test::serial] #[serial_test::serial]