Add support for IPv6 within VPN (#1061)

* add flake.nix with nix based dev shell
* add support for IPv6
* update thunk

---------

Co-authored-by: sijie.sun <sijie.sun@smartx.com>
This commit is contained in:
DavHau
2025-07-04 22:43:30 +07:00
committed by GitHub
parent 01e491ec07
commit d0cfc49806
32 changed files with 893 additions and 70 deletions
+21
View File
@@ -64,6 +64,9 @@ pub trait ConfigLoader: Send + Sync {
fn get_ipv4(&self) -> Option<cidr::Ipv4Inet>;
fn set_ipv4(&self, addr: Option<cidr::Ipv4Inet>);
fn get_ipv6(&self) -> Option<cidr::Ipv6Inet>;
fn set_ipv6(&self, addr: Option<cidr::Ipv6Inet>);
fn get_dhcp(&self) -> bool;
fn set_dhcp(&self, dhcp: bool);
@@ -259,6 +262,7 @@ struct Config {
instance_name: Option<String>,
instance_id: Option<uuid::Uuid>,
ipv4: Option<String>,
ipv6: Option<String>,
dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>,
@@ -416,6 +420,23 @@ impl ConfigLoader for TomlConfigLoader {
};
}
fn get_ipv6(&self) -> Option<cidr::Ipv6Inet> {
let locked_config = self.config.lock().unwrap();
locked_config
.ipv6
.as_ref()
.map(|s| s.parse().ok())
.flatten()
}
fn set_ipv6(&self, addr: Option<cidr::Ipv6Inet>) {
self.config.lock().unwrap().ipv6 = if let Some(addr) = addr {
Some(addr.to_string())
} else {
None
};
}
fn get_dhcp(&self) -> bool {
self.config.lock().unwrap().dhcp.unwrap_or_default()
}
+16
View File
@@ -61,6 +61,7 @@ pub struct GlobalCtx {
event_bus: EventBus,
cached_ipv4: AtomicCell<Option<cidr::Ipv4Inet>>,
cached_ipv6: AtomicCell<Option<cidr::Ipv6Inet>>,
cached_proxy_cidrs: AtomicCell<Option<Vec<ProxyNetworkConfig>>>,
ip_collector: Mutex<Option<Arc<IPCollector>>>,
@@ -124,6 +125,7 @@ impl GlobalCtx {
event_bus,
cached_ipv4: AtomicCell::new(None),
cached_ipv6: AtomicCell::new(None),
cached_proxy_cidrs: AtomicCell::new(None),
ip_collector: Mutex::new(Some(Arc::new(IPCollector::new(
@@ -191,6 +193,20 @@ impl GlobalCtx {
self.cached_ipv4.store(None);
}
pub fn get_ipv6(&self) -> Option<cidr::Ipv6Inet> {
if let Some(ret) = self.cached_ipv6.load() {
return Some(ret);
}
let addr = self.config.get_ipv6();
self.cached_ipv6.store(addr.clone());
return addr;
}
pub fn set_ipv6(&self, addr: Option<cidr::Ipv6Inet>) {
self.config.set_ipv6(addr);
self.cached_ipv6.store(None);
}
pub fn get_id(&self) -> uuid::Uuid {
self.config.get_id()
}
+58
View File
@@ -80,4 +80,62 @@ impl IfConfiguerTrait for MacIfConfiger {
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
run_shell_cmd(format!("ifconfig {} mtu {}", name, mtu).as_str()).await
}
async fn add_ipv6_ip(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!("ifconfig {} inet6 {}/{} add", name, address, cidr_prefix).as_str(),
)
.await
}
async fn remove_ipv6(&self, name: &str, ip: Option<std::net::Ipv6Addr>) -> Result<(), Error> {
if let Some(ip) = ip {
run_shell_cmd(format!("ifconfig {} inet6 {} delete", name, ip).as_str()).await
} else {
// Remove all IPv6 addresses is more complex on macOS, just succeed
Ok(())
}
}
async fn add_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
cost: Option<i32>,
) -> Result<(), Error> {
let cmd = if let Some(cost) = cost {
format!(
"route -n add -inet6 {}/{} -interface {} -hopcount {}",
address, cidr_prefix, name, cost
)
} else {
format!(
"route -n add -inet6 {}/{} -interface {}",
address, cidr_prefix, name
)
};
run_shell_cmd(cmd.as_str()).await
}
async fn remove_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"route -n delete -inet6 {}/{} -interface {}",
address, cidr_prefix, name
)
.as_str(),
)
.await
}
}
+29 -1
View File
@@ -7,7 +7,7 @@ mod windows;
mod route;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, Ipv6Addr};
use async_trait::async_trait;
use tokio::process::Command;
@@ -41,12 +41,40 @@ pub trait IfConfiguerTrait: Send + Sync {
) -> Result<(), Error> {
Ok(())
}
async fn add_ipv6_route(
&self,
_name: &str,
_address: Ipv6Addr,
_cidr_prefix: u8,
_cost: Option<i32>,
) -> Result<(), Error> {
Ok(())
}
async fn remove_ipv6_route(
&self,
_name: &str,
_address: Ipv6Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn add_ipv6_ip(
&self,
_name: &str,
_address: Ipv6Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn set_link_status(&self, _name: &str, _up: bool) -> Result<(), Error> {
Ok(())
}
async fn remove_ip(&self, _name: &str, _ip: Option<Ipv4Addr>) -> Result<(), Error> {
Ok(())
}
async fn remove_ipv6(&self, _name: &str, _ip: Option<Ipv6Addr>) -> Result<(), Error> {
Ok(())
}
async fn wait_interface_show(&self, _name: &str) -> Result<(), Error> {
return Ok(());
}
+126
View File
@@ -194,6 +194,32 @@ impl NetlinkIfConfiger {
)
}
fn get_prefix_len_ipv6(name: &str, ip: Ipv6Addr) -> Result<u8, Error> {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if addr.address() == IpAddr::V6(ip) {
return Ok(addr.network_length());
}
}
Err(Error::NotFound)
}
fn remove_one_ipv6(name: &str, ip: Ipv6Addr, prefix_len: u8) -> Result<(), Error> {
let mut message = AddressMessage::default();
message.header.prefix_len = prefix_len;
message.header.index = NetlinkIfConfiger::get_interface_index(name)?;
message.header.family = AddressFamily::Inet6;
message
.attributes
.push(AddressAttribute::Address(std::net::IpAddr::V6(ip)));
send_netlink_req_and_wait_one_resp::<RouteNetlinkMessage>(
RouteNetlinkMessage::DelAddress(message),
true,
)
}
pub(crate) fn mtu_op<T: TryInto<Ioctl>>(
name: &str,
op: T,
@@ -469,6 +495,106 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
Ok(())
}
async fn add_ipv6_ip(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let mut message = AddressMessage::default();
message.header.prefix_len = cidr_prefix;
message.header.index = NetlinkIfConfiger::get_interface_index(name)?;
message.header.family = AddressFamily::Inet6;
message
.attributes
.push(AddressAttribute::Address(std::net::IpAddr::V6(address)));
// For IPv6, we don't need IFA_LOCAL or IFA_BROADCAST
send_netlink_req_and_wait_one_resp::<RouteNetlinkMessage>(
RouteNetlinkMessage::NewAddress(message),
false,
)
}
async fn remove_ipv6(&self, name: &str, ip: Option<std::net::Ipv6Addr>) -> Result<(), Error> {
if ip.is_none() {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if let IpAddr::V6(ipv6) = addr.address() {
let prefix_len = addr.network_length();
Self::remove_one_ipv6(name, ipv6, prefix_len)?;
}
}
} else {
let ipv6 = ip.unwrap();
let prefix_len = Self::get_prefix_len_ipv6(name, ipv6)?;
Self::remove_one_ipv6(name, ipv6, prefix_len)?;
}
Ok(())
}
async fn add_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
cost: Option<i32>,
) -> Result<(), Error> {
let mut message = RouteMessage::default();
message.header.address_family = AddressFamily::Inet6;
message.header.destination_prefix_length = cidr_prefix;
message.header.table = RouteHeader::RT_TABLE_MAIN;
message.header.protocol = RouteProtocol::Static;
message.header.scope = RouteScope::Universe;
message.header.kind = RouteType::Unicast;
// Add metric (cost) if specified
if let Some(cost) = cost {
message
.attributes
.push(RouteAttribute::Priority(cost as u32));
}
message
.attributes
.push(RouteAttribute::Oif(NetlinkIfConfiger::get_interface_index(
name,
)?));
message
.attributes
.push(RouteAttribute::Destination(RouteAddress::Inet6(address)));
send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::NewRoute(message), false)
}
async fn remove_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let routes = Self::list_routes()?;
let ifidx = NetlinkIfConfiger::get_interface_index(name)?;
for msg in routes {
let other_route: Route = msg.clone().into();
if other_route.destination == std::net::IpAddr::V6(address)
&& other_route.prefix == cidr_prefix
&& other_route.ifindex == Some(ifidx)
{
send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::DelRoute(msg), true)?;
return Ok(());
}
}
Ok(())
}
}
#[cfg(test)]
+68
View File
@@ -169,6 +169,74 @@ impl IfConfiguerTrait for WindowsIfConfiger {
)
.await
}
async fn add_ipv6_ip(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv6 add address {} {}/{}",
name, address, cidr_prefix
)
.as_str(),
)
.await
}
async fn remove_ipv6(&self, name: &str, ip: Option<std::net::Ipv6Addr>) -> Result<(), Error> {
if let Some(ip) = ip {
run_shell_cmd(
format!("netsh interface ipv6 delete address {} {}", name, ip).as_str(),
)
.await
} else {
// Remove all IPv6 addresses
run_shell_cmd(
format!("netsh interface ipv6 delete address {} all", name).as_str(),
)
.await
}
}
async fn add_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
cost: Option<i32>,
) -> Result<(), Error> {
let cmd = if let Some(cost) = cost {
format!(
"netsh interface ipv6 add route {}/{} {} metric={}",
address, cidr_prefix, name, cost
)
} else {
format!(
"netsh interface ipv6 add route {}/{} {}",
address, cidr_prefix, name
)
};
run_shell_cmd(cmd.as_str()).await
}
async fn remove_ipv6_route(
&self,
name: &str,
address: std::net::Ipv6Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv6 delete route {}/{} {}",
address, cidr_prefix, name
)
.as_str(),
)
.await
}
}
pub struct RegistryManager;
+13
View File
@@ -148,6 +148,13 @@ struct NetworkOptions {
)]
ipv4: Option<String>,
#[arg(
long,
env = "ET_IPV6",
help = t!("core_clap.ipv6").to_string()
)]
ipv6: Option<String>,
#[arg(
short,
long,
@@ -615,6 +622,12 @@ impl NetworkOptions {
})?))
}
if let Some(ipv6) = &self.ipv6 {
cfg.set_ipv6(Some(ipv6.parse().with_context(|| {
format!("failed to parse ipv6 address: {}", ipv6)
})?))
}
if !self.peers.is_empty() {
let mut peers = cfg.get_peers();
peers.reserve(peers.len() + self.peers.len());
+1 -1
View File
@@ -301,7 +301,7 @@ impl Socks5ServerNet {
let dst = ipv4.get_destination();
let packet = ZCPacket::new_with_payload(&data);
if let Err(e) = peer_manager.send_msg_ipv4(packet, dst).await {
if let Err(e) = peer_manager.send_msg_by_ip(packet, IpAddr::V4(dst)).await {
tracing::error!("send to peer failed in smoltcp sender: {:?}", e);
}
}
+1 -1
View File
@@ -557,7 +557,7 @@ impl<C: NatDstConnector> TcpProxy<C> {
let dst = ipv4.get_destination();
let packet = ZCPacket::new_with_payload(&data);
if let Err(e) = peer_mgr.send_msg_ipv4(packet, dst).await {
if let Err(e) = peer_mgr.send_msg_by_ip(packet, IpAddr::V4(dst)).await {
tracing::error!("send to peer failed in smoltcp sender: {:?}", e);
}
}
+1 -1
View File
@@ -34,7 +34,7 @@ pub async fn prepare_env(dns_name: &str, tun_ip: Ipv4Inet) -> (Arc<PeerManager>,
let r = Arc::new(tokio::sync::Mutex::new(r));
let mut virtual_nic = NicCtx::new(peer_mgr.get_global_ctx(), &peer_mgr, r);
virtual_nic.run(tun_ip).await.unwrap();
virtual_nic.run(Some(tun_ip), None).await.unwrap();
(peer_mgr, virtual_nic)
}
+25 -20
View File
@@ -484,7 +484,7 @@ impl Instance {
&peer_manager_c,
_peer_packet_receiver.clone(),
);
if let Err(e) = new_nic_ctx.run(ip).await {
if let Err(e) = new_nic_ctx.run(Some(ip), global_ctx_c.get_ipv6()).await {
tracing::error!(
?current_dhcp_ip,
?candidate_ipv4_addr,
@@ -532,24 +532,29 @@ impl Instance {
if !self.global_ctx.config.get_flags().no_tun {
#[cfg(not(any(target_os = "android", target_env = "ohos")))]
if let Some(ipv4_addr) = self.global_ctx.get_ipv4() {
let mut new_nic_ctx = NicCtx::new(
self.global_ctx.clone(),
&self.peer_manager,
self.peer_packet_receiver.clone(),
);
new_nic_ctx.run(ipv4_addr).await?;
let ifname = new_nic_ctx.ifname().await;
Self::use_new_nic_ctx(
self.nic_ctx.clone(),
new_nic_ctx,
Self::create_magic_dns_runner(
self.peer_manager.clone(),
ifname,
ipv4_addr.clone(),
),
)
.await;
{
let ipv4_addr = self.global_ctx.get_ipv4();
let ipv6_addr = self.global_ctx.get_ipv6();
// Only run if we have at least one IP address (IPv4 or IPv6)
if ipv4_addr.is_some() || ipv6_addr.is_some() {
let mut new_nic_ctx = NicCtx::new(
self.global_ctx.clone(),
&self.peer_manager,
self.peer_packet_receiver.clone(),
);
new_nic_ctx.run(ipv4_addr, ipv6_addr).await?;
let ifname = new_nic_ctx.ifname().await;
// Create Magic DNS runner only if we have IPv4
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(self.peer_manager.clone(), ifname, ipv4)
} else {
None
};
Self::use_new_nic_ctx(self.nic_ctx.clone(), new_nic_ctx, dns_runner).await;
}
}
}
@@ -852,7 +857,7 @@ impl Drop for Instance {
};
let now = std::time::Instant::now();
while now.elapsed().as_secs() < 1 {
while now.elapsed().as_secs() < 10 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
if pm.strong_count() == 0 {
tracing::info!(
+92 -10
View File
@@ -1,7 +1,7 @@
use std::{
collections::BTreeSet,
io,
net::Ipv4Addr,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
@@ -25,7 +25,7 @@ use byteorder::WriteBytesExt as _;
use bytes::{BufMut, BytesMut};
use futures::{lock::BiLock, ready, SinkExt, Stream, StreamExt};
use pin_project_lite::pin_project;
use pnet::packet::ipv4::Ipv4Packet;
use pnet::packet::{ipv4::Ipv4Packet, ipv6::Ipv6Packet};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::Mutex,
@@ -434,12 +434,26 @@ impl VirtualNic {
Ok(())
}
pub async fn add_ipv6_route(&self, address: Ipv6Addr, cidr: u8) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.add_ipv6_route(self.ifname(), address, cidr, None)
.await?;
Ok(())
}
pub async fn remove_ip(&self, ip: Option<Ipv4Addr>) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.remove_ip(self.ifname(), ip).await?;
Ok(())
}
pub async fn remove_ipv6(&self, ip: Option<Ipv6Addr>) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.remove_ipv6(self.ifname(), ip).await?;
Ok(())
}
pub async fn add_ip(&self, ip: Ipv4Addr, cidr: i32) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
@@ -448,6 +462,14 @@ impl VirtualNic {
Ok(())
}
pub async fn add_ipv6(&self, ip: Ipv6Addr, cidr: i32) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.add_ipv6_ip(self.ifname(), ip, cidr as u8)
.await?;
Ok(())
}
pub fn get_ifcfg(&self) -> impl IfConfiguerTrait {
IfConfiger {}
}
@@ -496,6 +518,20 @@ impl NicCtx {
Ok(())
}
pub async fn assign_ipv6_to_tun_device(&self, ipv6_addr: cidr::Ipv6Inet) -> Result<(), Error> {
let nic = self.nic.lock().await;
nic.link_up().await?;
nic.remove_ipv6(None).await?;
nic.add_ipv6(ipv6_addr.address(), ipv6_addr.network_length() as i32)
.await?;
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
{
nic.add_ipv6_route(ipv6_addr.first_address(), ipv6_addr.network_length())
.await?;
}
Ok(())
}
async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) {
if let Some(ipv4) = Ipv4Packet::new(ret.payload()) {
if ipv4.get_version() != 4 {
@@ -509,16 +545,53 @@ impl NicCtx {
);
// TODO: use zero-copy
let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await;
let send_ret = mgr.send_msg_by_ip(ret, IpAddr::V4(dst_ipv4)).await;
if send_ret.is_err() {
tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed")
tracing::trace!(?send_ret, "[USER_PACKET] send_msg failed")
}
} else {
tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet");
}
}
fn do_forward_nic_to_peers(
async fn do_forward_nic_to_peers_ipv6(ret: ZCPacket, mgr: &PeerManager) {
if let Some(ipv6) = Ipv6Packet::new(ret.payload()) {
if ipv6.get_version() != 6 {
tracing::info!("[USER_PACKET] not ipv6 packet: {:?}", ipv6);
return;
}
let dst_ipv6 = ipv6.get_destination();
tracing::trace!(
?ret,
"[USER_PACKET] recv new packet from tun device and forward to peers."
);
// TODO: use zero-copy
let send_ret = mgr.send_msg_by_ip(ret, IpAddr::V6(dst_ipv6)).await;
if send_ret.is_err() {
tracing::trace!(?send_ret, "[USER_PACKET] send_msg failed")
}
} else {
tracing::warn!(?ret, "[USER_PACKET] not ipv6 packet");
}
}
async fn do_forward_nic_to_peers(ret: ZCPacket, mgr: &PeerManager) {
let payload = ret.payload();
if payload.is_empty() {
return;
}
match payload[0] >> 4 {
4 => Self::do_forward_nic_to_peers_ipv4(ret, mgr).await,
6 => Self::do_forward_nic_to_peers_ipv6(ret, mgr).await,
_ => {
tracing::warn!(?ret, "[USER_PACKET] unknown IP version");
}
}
}
fn do_forward_nic_to_peers_task(
&mut self,
mut stream: Pin<Box<dyn ZCPacketStream>>,
) -> Result<(), Error> {
@@ -532,7 +605,7 @@ impl NicCtx {
tracing::error!("read from nic failed: {:?}", ret);
break;
}
Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await;
Self::do_forward_nic_to_peers(ret.unwrap(), mgr.as_ref()).await;
}
panic!("nic stream closed");
});
@@ -647,7 +720,7 @@ impl NicCtx {
Ok(())
}
pub async fn run(&mut self, ipv4_addr: cidr::Ipv4Inet) -> Result<(), Error> {
pub async fn run(&mut self, ipv4_addr: Option<cidr::Ipv4Inet>, ipv6_addr: Option<cidr::Ipv6Inet>) -> Result<(), Error> {
let tunnel = {
let mut nic = self.nic.lock().await;
match nic.create_dev().await {
@@ -681,10 +754,19 @@ impl NicCtx {
let (stream, sink) = tunnel.split();
self.do_forward_nic_to_peers(stream)?;
self.do_forward_nic_to_peers_task(stream)?;
self.do_forward_peers_to_nic(sink);
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
// Assign IPv4 address if provided
if let Some(ipv4_addr) = ipv4_addr {
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
}
// Assign IPv6 address if provided
if let Some(ipv6_addr) = ipv6_addr {
self.assign_ipv6_to_tun_device(ipv6_addr).await?;
}
self.run_proxy_cidrs_route_updater().await?;
Ok(())
@@ -710,7 +792,7 @@ impl NicCtx {
let (stream, sink) = tunnel.split();
self.do_forward_nic_to_peers(stream)?;
self.do_forward_nic_to_peers_task(stream)?;
self.do_forward_peers_to_nic(sink);
Ok(())
+46 -6
View File
@@ -1,6 +1,6 @@
use std::{
fmt::Debug,
net::Ipv4Addr,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::{Arc, Weak},
time::{Instant, SystemTime},
};
@@ -873,6 +873,43 @@ impl PeerManager {
(dst_peers, is_exit_node)
}
pub async fn get_msg_dst_peer_ipv6(&self, ipv6_addr: &Ipv6Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
let network_length = self
.global_ctx
.get_ipv6()
.map(|x| x.network_length())
.unwrap_or(64);
let ipv6_inet = cidr::Ipv6Inet::new(*ipv6_addr, network_length).unwrap();
if ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() {
dst_peers.extend(
self.peers
.list_routes()
.await
.iter()
.map(|x| x.key().clone()),
);
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(&ipv6_addr).await {
dst_peers.push(peer_id);
} else {
// For IPv6, we'll need to implement exit node support later
// For now, just try to find any available peer for routing
if dst_peers.is_empty() {
dst_peers.extend(
self.peers
.list_routes()
.await
.iter()
.map(|x| x.key().clone()),
);
is_exit_node = true;
}
}
(dst_peers, is_exit_node)
}
pub async fn try_compress_and_encrypt(
compress_algo: CompressorAlgo,
encryptor: &Box<dyn Encryptor>,
@@ -887,11 +924,11 @@ impl PeerManager {
Ok(())
}
pub async fn send_msg_ipv4(&self, mut msg: ZCPacket, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
pub async fn send_msg_by_ip(&self, mut msg: ZCPacket, ip_addr: IpAddr) -> Result<(), Error> {
tracing::trace!(
"do send_msg in peer manager, msg: {:?}, ipv4_addr: {}",
"do send_msg in peer manager, msg: {:?}, ip_addr: {}",
msg,
ipv4_addr
ip_addr
);
msg.fill_peer_manager_hdr(
@@ -911,10 +948,13 @@ impl PeerManager {
.await;
}
let (dst_peers, is_exit_node) = self.get_msg_dst_peer(&ipv4_addr).await;
let (dst_peers, is_exit_node) = match ip_addr {
IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer(&ipv4_addr).await,
IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(&ipv6_addr).await,
};
if dst_peers.is_empty() {
tracing::info!("no peer id for ipv4: {}", ipv4_addr);
tracing::info!("no peer id for ip: {}", ip_addr);
return Ok(());
}
+11 -1
View File
@@ -1,4 +1,4 @@
use std::{net::Ipv4Addr, sync::Arc};
use std::{net::{Ipv4Addr, Ipv6Addr}, sync::Arc};
use anyhow::Context;
use dashmap::DashMap;
@@ -194,6 +194,16 @@ impl PeerMap {
None
}
pub async fn get_peer_id_by_ipv6(&self, ipv6: &Ipv6Addr) -> Option<PeerId> {
for route in self.routes.read().await.iter() {
let peer_id = route.get_peer_id_by_ipv6(ipv6).await;
if peer_id.is_some() {
return peer_id;
}
}
None
}
pub async fn get_route_peer_info(&self, peer_id: PeerId) -> Option<RoutePeerInfo> {
for route in self.routes.read().await.iter() {
if let Some(info) = route.get_peer_info(peer_id).await {
+37 -1
View File
@@ -1,7 +1,7 @@
use std::{
collections::BTreeSet,
fmt::Debug,
net::Ipv4Addr,
net::{Ipv4Addr, Ipv6Addr},
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Weak,
@@ -125,6 +125,7 @@ impl RoutePeerInfo {
peer_route_id: 0,
network_length: 24,
quic_port: None,
ipv6_addr: None,
}
}
@@ -165,6 +166,7 @@ impl RoutePeerInfo {
.unwrap_or(24),
quic_port: global_ctx.get_quic_proxy_port().map(|x| x as u32),
ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()),
};
let need_update_periodically = if let Ok(Ok(d)) =
@@ -221,6 +223,8 @@ impl Into<crate::proto::cli::Route> for RoutePeerInfo {
next_hop_peer_id_latency_first: None,
cost_latency_first: None,
path_latency_latency_first: None,
ipv6_addr: self.ipv6_addr.map(Into::into),
}
}
}
@@ -635,6 +639,7 @@ struct RouteTable {
peer_infos: DashMap<PeerId, RoutePeerInfo>,
next_hop_map: NextHopMap,
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerId>,
ipv6_peer_id_map: DashMap<Ipv6Addr, PeerId>,
cidr_peer_id_map: DashMap<cidr::IpCidr, PeerId>,
next_hop_map_version: AtomicVersion,
}
@@ -645,6 +650,7 @@ impl RouteTable {
peer_infos: DashMap::new(),
next_hop_map: DashMap::new(),
ipv4_peer_id_map: DashMap::new(),
ipv6_peer_id_map: DashMap::new(),
cidr_peer_id_map: DashMap::new(),
next_hop_map_version: AtomicVersion::new(),
}
@@ -742,6 +748,10 @@ impl RouteTable {
// remove ipv4 map for peers we cannot reach.
self.next_hop_map.contains_key(v)
});
self.ipv6_peer_id_map.retain(|_, v| {
// remove ipv6 map for peers we cannot reach.
self.next_hop_map.contains_key(v)
});
self.cidr_peer_id_map.retain(|_, v| {
// remove cidr map for peers we cannot reach.
self.next_hop_map.contains_key(v)
@@ -876,6 +886,17 @@ impl RouteTable {
.or_insert(*peer_id);
}
if let Some(ipv6_addr) = info.ipv6_addr.and_then(|x| x.address) {
self.ipv6_peer_id_map
.entry(ipv6_addr.into())
.and_modify(|v| {
if *v != *peer_id && is_new_peer_better(*v) {
*v = *peer_id;
}
})
.or_insert(*peer_id);
}
for cidr in info.proxy_cidrs.iter() {
self.cidr_peer_id_map
.entry(cidr.parse().unwrap())
@@ -2267,6 +2288,21 @@ impl Route for PeerRoute {
None
}
async fn get_peer_id_by_ipv6(&self, ipv6_addr: &Ipv6Addr) -> Option<PeerId> {
let route_table = &self.service_impl.route_table;
if let Some(peer_id) = route_table.ipv6_peer_id_map.get(ipv6_addr) {
return Some(*peer_id);
}
// TODO: Add proxy support for IPv6 similar to IPv4
// if let Some(peer_id) = route_table.get_peer_id_for_proxy_ipv6(ipv6_addr) {
// return Some(peer_id);
// }
tracing::debug!(?ipv6_addr, "no peer id for ipv6");
None
}
async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {
*self.service_impl.cost_calculator.write().unwrap() = Some(_cost_fn);
self.service_impl.synced_route_info.version.inc();
+5
View File
@@ -36,6 +36,11 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
.chain(self.global_ctx.get_running_listeners().into_iter())
.map(Into::into)
.collect();
// remove et ipv6 from the interface ipv6 list
if let Some(et_ipv6) = self.global_ctx.get_ipv6() {
let et_ipv6: crate::proto::common::Ipv6Addr = et_ipv6.address().into();
ret.interface_ipv6s.retain(|x| *x != et_ipv6);
}
tracing::trace!(
"get_ip_list: public_ipv4: {:?}, public_ipv6: {:?}, listeners: {:?}",
ret.public_ipv4,
+5 -1
View File
@@ -1,4 +1,4 @@
use std::{net::Ipv4Addr, sync::Arc};
use std::{net::{Ipv4Addr, Ipv6Addr}, sync::Arc};
use dashmap::DashMap;
@@ -82,6 +82,10 @@ pub trait Route {
None
}
async fn get_peer_id_by_ipv6(&self, _ipv6: &Ipv6Addr) -> Option<PeerId> {
None
}
async fn list_peers_own_foreign_network(
&self,
_network_identity: &NetworkIdentity,
+2
View File
@@ -65,6 +65,8 @@ message Route {
optional uint32 next_hop_peer_id_latency_first = 12;
optional int32 cost_latency_first = 13;
optional int32 path_latency_latency_first = 14;
common.Ipv6Inet ipv6_addr = 15;
}
message PeerRoutePair {
+5
View File
@@ -139,6 +139,11 @@ message Ipv4Inet {
uint32 network_length = 2;
}
message Ipv6Inet {
Ipv6Addr address = 1;
uint32 network_length = 2;
}
message Url { string url = 1; }
message SocketAddr {
+35
View File
@@ -131,6 +131,41 @@ impl FromStr for Ipv4Inet {
}
}
impl From<cidr::Ipv6Inet> for Ipv6Inet {
fn from(value: cidr::Ipv6Inet) -> Self {
Ipv6Inet {
address: Some(value.address().into()),
network_length: value.network_length() as u32,
}
}
}
impl From<Ipv6Inet> for cidr::Ipv6Inet {
fn from(value: Ipv6Inet) -> Self {
cidr::Ipv6Inet::new(
value.address.unwrap_or_default().into(),
value.network_length as u8,
)
.unwrap()
}
}
impl fmt::Display for Ipv6Inet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", cidr::Ipv6Inet::from(self.clone()))
}
}
impl FromStr for Ipv6Inet {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Ipv6Inet::from(
cidr::Ipv6Inet::from_str(s).with_context(|| "Failed to parse Ipv6Inet")?,
))
}
}
impl From<url::Url> for Url {
fn from(value: url::Url) -> Self {
Url {
+1
View File
@@ -24,6 +24,7 @@ message RoutePeerInfo {
uint32 network_length = 13;
optional uint32 quic_port = 14;
optional common.Ipv6Inet ipv6_addr = 15;
}
message PeerIdVersion {
+66
View File
@@ -0,0 +1,66 @@
use std::net::Ipv6Addr;
use crate::{
common::config::{ConfigLoader, TomlConfigLoader},
common::global_ctx::tests::get_mock_global_ctx,
peers::peer_manager::RouteAlgoType,
proto::peer_rpc::RoutePeerInfo,
};
#[tokio::test]
async fn test_ipv6_config_support() {
let config = TomlConfigLoader::default();
// Test IPv6 configuration setting and getting
let ipv6_cidr = "fd00::1/64".parse().unwrap();
config.set_ipv6(Some(ipv6_cidr));
assert_eq!(config.get_ipv6(), Some(ipv6_cidr));
}
#[tokio::test]
async fn test_global_ctx_ipv6() {
let global_ctx = get_mock_global_ctx();
// Test setting and getting IPv6 from global context
let ipv6_cidr = "fd00::1/64".parse().unwrap();
global_ctx.set_ipv6(Some(ipv6_cidr));
assert_eq!(global_ctx.get_ipv6(), Some(ipv6_cidr));
}
#[tokio::test]
async fn test_route_peer_info_ipv6() {
let global_ctx = get_mock_global_ctx();
// Set IPv6 address in global context
let ipv6_cidr = "fd00::1/64".parse().unwrap();
global_ctx.set_ipv6(Some(ipv6_cidr));
// Create RoutePeerInfo with IPv6 support
let peer_info = RoutePeerInfo::new();
let updated_info = peer_info.update_self(123, 456, &global_ctx);
// Verify IPv6 address is included
assert!(updated_info.ipv6_addr.is_some());
let ipv6_addr: Ipv6Addr = updated_info.ipv6_addr.unwrap().address.unwrap().into();
assert_eq!(ipv6_addr, ipv6_cidr.address());
}
#[tokio::test]
async fn test_peer_manager_ipv6() {
let global_ctx = get_mock_global_ctx();
let (packet_sender, _packet_receiver) = tokio::sync::mpsc::channel(100);
let peer_mgr = crate::peers::peer_manager::PeerManager::new(
RouteAlgoType::Ospf,
global_ctx.clone(),
packet_sender,
);
// Test IPv6 address lookup for unknown address
let ipv6_addr = Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 2);
let (peers, _is_self) = peer_mgr.get_msg_dst_peer_ipv6(&ipv6_addr).await;
// Should return empty peers list for unknown IPv6
assert!(peers.is_empty());
}
+2
View File
@@ -1,6 +1,8 @@
#[cfg(target_os = "linux")]
mod three_node;
mod ipv6_test;
use crate::common::PeerId;
use crate::peers::peer_manager::PeerManager;
+85 -22
View File
@@ -51,11 +51,17 @@ pub fn prepare_linux_namespaces() {
add_ns_to_bridge("br_b", "net_d");
}
pub fn get_inst_config(inst_name: &str, ns: Option<&str>, ipv4: &str) -> TomlConfigLoader {
pub fn get_inst_config(
inst_name: &str,
ns: Option<&str>,
ipv4: &str,
ipv6: &str,
) -> TomlConfigLoader {
let config = TomlConfigLoader::default();
config.set_inst_name(inst_name.to_owned());
config.set_netns(ns.map(|s| s.to_owned()));
config.set_ipv4(Some(ipv4.parse().unwrap()));
config.set_ipv6(Some(ipv6.parse().unwrap()));
config.set_listeners(vec![
"tcp://0.0.0.0:11010".parse().unwrap(),
"udp://0.0.0.0:11010".parse().unwrap(),
@@ -82,16 +88,19 @@ pub async fn init_three_node_ex<F: Fn(TomlConfigLoader) -> TomlConfigLoader>(
"inst1",
Some("net_a"),
"10.144.144.1",
"fd00::1/64",
)));
let mut inst2 = Instance::new(cfg_cb(get_inst_config(
"inst2",
Some("net_b"),
"10.144.144.2",
"fd00::2/64",
)));
let mut inst3 = Instance::new(cfg_cb(get_inst_config(
"inst3",
Some("net_c"),
"10.144.144.3",
"fd00::3/64",
)));
inst1.run().await.unwrap();
@@ -232,6 +241,30 @@ async fn ping_test(from_netns: &str, target_ip: &str, payload_size: Option<usize
code.code().unwrap() == 0
}
async fn ping6_test(from_netns: &str, target_ip: &str, payload_size: Option<usize>) -> bool {
let _g = NetNS::new(Some(ROOT_NETNS_NAME.to_owned())).guard();
let code = tokio::process::Command::new("ip")
.args(&[
"netns",
"exec",
from_netns,
"ping6",
"-c",
"1",
"-s",
payload_size.unwrap_or(56).to_string().as_str(),
"-W",
"1",
target_ip.to_string().as_str(),
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await
.unwrap();
code.code().unwrap() == 0
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
@@ -250,12 +283,26 @@ pub async fn basic_three_node_test(#[values("tcp", "udp", "wg", "ws", "wss")] pr
insts[0].get_peer_manager().list_routes().await,
);
// Test IPv4 connectivity
wait_for_condition(
|| async { ping_test("net_c", "10.144.144.1", None).await },
Duration::from_secs(5000),
)
.await;
// Test IPv6 connectivity
wait_for_condition(
|| async { ping6_test("net_c", "fd00::1", None).await },
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| async { ping6_test("net_a", "fd00::3", None).await },
Duration::from_secs(5),
)
.await;
drop_insts(insts).await;
}
@@ -562,7 +609,12 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str
};
let insts = init_three_node(proto).await;
let mut inst4 = Instance::new(get_inst_config("inst4", Some("net_d"), "10.144.144.4"));
let mut inst4 = Instance::new(get_inst_config(
"inst4",
Some("net_d"),
"10.144.144.4",
"fd00::4/64",
));
if proto == "tcp" {
inst4
.get_conn_manager()
@@ -627,16 +679,7 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str
.iter()
.find(|r| **r == inst4.peer_id())
.is_none();
if !ret {
println!(
"conn info: {:?}",
insts[2]
.get_peer_manager()
.get_peer_map()
.list_peer_conns(inst4.peer_id())
.await
);
}
ret
},
// 0 down, assume last packet is recv in -0.01
@@ -726,13 +769,23 @@ pub async fn udp_broadcast_test() {
pub async fn foreign_network_forward_nic_data() {
prepare_linux_namespaces();
let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1");
let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1", "fd00::1/64");
center_node_config
.set_network_identity(NetworkIdentity::new("center".to_string(), "".to_string()));
let mut center_inst = Instance::new(center_node_config);
let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_b"), "10.144.145.1"));
let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2"));
let mut inst1 = Instance::new(get_inst_config(
"inst1",
Some("net_b"),
"10.144.145.1",
"fd00:1::1/64",
));
let mut inst2 = Instance::new(get_inst_config(
"inst2",
Some("net_c"),
"10.144.145.2",
"fd00:1::2/64",
));
center_inst.run().await.unwrap();
inst1.run().await.unwrap();
@@ -940,21 +993,26 @@ pub async fn foreign_network_functional_cluster() {
crate::set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
prepare_linux_namespaces();
let center_node_config1 = get_inst_config("inst1", Some("net_a"), "10.144.144.1");
let center_node_config1 = get_inst_config("inst1", Some("net_a"), "10.144.144.1", "fd00::1/64");
center_node_config1
.set_network_identity(NetworkIdentity::new("center".to_string(), "".to_string()));
let mut center_inst1 = Instance::new(center_node_config1);
let center_node_config2 = get_inst_config("inst2", Some("net_b"), "10.144.144.2");
let center_node_config2 = get_inst_config("inst2", Some("net_b"), "10.144.144.2", "fd00::2/64");
center_node_config2
.set_network_identity(NetworkIdentity::new("center".to_string(), "".to_string()));
let mut center_inst2 = Instance::new(center_node_config2);
let inst1_config = get_inst_config("inst1", Some("net_c"), "10.144.145.1");
let inst1_config = get_inst_config("inst1", Some("net_c"), "10.144.145.1", "fd00:2::1/64");
inst1_config.set_listeners(vec![]);
let mut inst1 = Instance::new(inst1_config);
let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_d"), "10.144.145.2"));
let mut inst2 = Instance::new(get_inst_config(
"inst2",
Some("net_d"),
"10.144.145.2",
"fd00:2::2/64",
));
center_inst1.run().await.unwrap();
center_inst2.run().await.unwrap();
@@ -1011,18 +1069,23 @@ pub async fn foreign_network_functional_cluster() {
pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) {
prepare_linux_namespaces();
let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1");
let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1", "fd00::1/64");
if is_foreign {
center_node_config
.set_network_identity(NetworkIdentity::new("center".to_string(), "".to_string()));
}
let mut center_inst = Instance::new(center_node_config);
let inst1_config = get_inst_config("inst1", Some("net_b"), "10.144.145.1");
let inst1_config = get_inst_config("inst1", Some("net_b"), "10.144.145.1", "fd00:1::1/64");
inst1_config.set_listeners(vec![]);
let mut inst1 = Instance::new(inst1_config);
let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2"));
let mut inst2 = Instance::new(get_inst_config(
"inst2",
Some("net_c"),
"10.144.145.2",
"fd00:1::2/64",
));
center_inst.run().await.unwrap();
inst1.run().await.unwrap();
+6 -1
View File
@@ -388,7 +388,12 @@ pub(crate) fn setup_sokcet2_ext(
}
}
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux", target_env = "ohos"))]
#[cfg(any(
target_os = "android",
target_os = "fuchsia",
target_os = "linux",
target_env = "ohos"
))]
if let Some(dev_name) = bind_dev {
tracing::trace!(dev_name = ?dev_name, "bind device");
socket2_socket.bind_device(Some(dev_name.as_bytes()))?;
+2 -2
View File
@@ -1,5 +1,5 @@
use std::{
net::{Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};
@@ -128,7 +128,7 @@ impl WireGuardImpl {
tracing::trace!(?i, "Received from wg client");
let dst = i.get_destination();
let _ = peer_mgr
.send_msg_ipv4(ZCPacket::new_with_payload(inner.as_ref()), dst)
.send_msg_by_ip(ZCPacket::new_with_payload(inner.as_ref()), IpAddr::V4(dst))
.await;
}