mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-09 11:14:30 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dd2e3488fe | |||
| 67366aba55 | |||
| 463228f1de | |||
| 7908f9c146 |
@@ -202,7 +202,6 @@ pub struct GlobalCtx {
|
||||
|
||||
cached_ipv4: AtomicCell<Option<cidr::Ipv4Inet>>,
|
||||
cached_ipv6: AtomicCell<Option<cidr::Ipv6Inet>>,
|
||||
public_ipv6_lease: AtomicCell<Option<cidr::Ipv6Inet>>,
|
||||
cached_proxy_cidrs: AtomicCell<Option<Vec<ProxyNetworkConfig>>>,
|
||||
|
||||
ip_collector: Mutex<Option<Arc<IPCollector>>>,
|
||||
@@ -299,7 +298,6 @@ impl GlobalCtx {
|
||||
event_bus,
|
||||
cached_ipv4: AtomicCell::new(None),
|
||||
cached_ipv6: AtomicCell::new(None),
|
||||
public_ipv6_lease: AtomicCell::new(None),
|
||||
cached_proxy_cidrs: AtomicCell::new(None),
|
||||
|
||||
ip_collector: Mutex::new(Some(Arc::new(IPCollector::new(
|
||||
@@ -387,22 +385,6 @@ impl GlobalCtx {
|
||||
self.cached_ipv6.store(None);
|
||||
}
|
||||
|
||||
pub fn get_public_ipv6_lease(&self) -> Option<cidr::Ipv6Inet> {
|
||||
self.public_ipv6_lease.load()
|
||||
}
|
||||
|
||||
pub fn set_public_ipv6_lease(&self, addr: Option<cidr::Ipv6Inet>) {
|
||||
self.public_ipv6_lease.store(addr);
|
||||
}
|
||||
|
||||
pub fn is_ip_local_ipv6(&self, ip: &std::net::Ipv6Addr) -> bool {
|
||||
self.get_ipv6().map(|x| x.address() == *ip).unwrap_or(false)
|
||||
|| self
|
||||
.get_public_ipv6_lease()
|
||||
.map(|x| x.address() == *ip)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
pub fn get_advertised_ipv6_public_addr_prefix(&self) -> Option<cidr::Ipv6Cidr> {
|
||||
*self.advertised_ipv6_public_addr_prefix.lock().unwrap()
|
||||
}
|
||||
@@ -431,7 +413,7 @@ impl GlobalCtx {
|
||||
pub fn is_ip_local_virtual_ip(&self, ip: &IpAddr) -> bool {
|
||||
match ip {
|
||||
IpAddr::V4(v4) => self.get_ipv4().map(|x| x.address() == *v4).unwrap_or(false),
|
||||
IpAddr::V6(v6) => self.is_ip_local_ipv6(v6),
|
||||
IpAddr::V6(v6) => self.get_ipv6().map(|x| x.address() == *v6).unwrap_or(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,23 +663,23 @@ impl GlobalCtx {
|
||||
pub fn should_deny_proxy(&self, dst_addr: &SocketAddr, is_udp: bool) -> bool {
|
||||
let _g = self.net_ns.guard();
|
||||
let ip = dst_addr.ip();
|
||||
// first check if ip is an EasyTier-managed local address
|
||||
// first check if ip is virtual ip
|
||||
// then try bind this ip, if succ means it is local ip
|
||||
let dst_is_local_et_ip = self.is_ip_local_virtual_ip(&ip);
|
||||
let dst_is_local_virtual_ip = self.is_ip_local_virtual_ip(&ip);
|
||||
// this is an expensive operation, should be called sparingly
|
||||
// 1. tcp/kcp/quic call this only after proxy conn is established
|
||||
// 2. udp cache the result in nat entry
|
||||
let dst_is_local_phy_ip = std::net::UdpSocket::bind(format!("{}:0", ip)).is_ok();
|
||||
|
||||
tracing::trace!(
|
||||
"check should_deny_proxy: dst_addr={}, dst_is_local_et_ip={}, dst_is_local_phy_ip={}, is_udp={}",
|
||||
"check should_deny_proxy: dst_addr={}, dst_is_local_virtual_ip={}, dst_is_local_phy_ip={}, is_udp={}",
|
||||
dst_addr,
|
||||
dst_is_local_et_ip,
|
||||
dst_is_local_virtual_ip,
|
||||
dst_is_local_phy_ip,
|
||||
is_udp
|
||||
);
|
||||
|
||||
if dst_is_local_et_ip || dst_is_local_phy_ip {
|
||||
if dst_is_local_virtual_ip || dst_is_local_phy_ip {
|
||||
// if is local ip, make sure the port is not one of the listening ports
|
||||
self.is_port_in_running_listeners(dst_addr.port(), is_udp)
|
||||
|| (!is_udp && protected_port::is_protected_tcp_port(dst_addr.port()))
|
||||
@@ -826,40 +808,6 @@ pub mod tests {
|
||||
protected_port::clear_protected_tcp_ports_for_test();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn virtual_ipv6_and_public_ipv6_lease_are_stored_separately() {
|
||||
let config = TomlConfigLoader::default();
|
||||
let global_ctx = GlobalCtx::new(config);
|
||||
let virtual_ipv6 = "fd00::1/64".parse().unwrap();
|
||||
let public_ipv6 = "2001:db8::2/64".parse().unwrap();
|
||||
|
||||
global_ctx.set_ipv6(Some(virtual_ipv6));
|
||||
global_ctx.set_public_ipv6_lease(Some(public_ipv6));
|
||||
|
||||
assert_eq!(global_ctx.get_ipv6(), Some(virtual_ipv6));
|
||||
assert_eq!(global_ctx.get_public_ipv6_lease(), Some(public_ipv6));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn public_ipv6_lease_is_treated_as_local_ip() {
|
||||
protected_port::clear_protected_tcp_ports_for_test();
|
||||
|
||||
let config = TomlConfigLoader::default();
|
||||
let global_ctx = GlobalCtx::new(config);
|
||||
let public_ipv6 = "2001:db8::2/64".parse().unwrap();
|
||||
let listener: url::Url = "tcp://[2001:db8::2]:11010".parse().unwrap();
|
||||
global_ctx.set_public_ipv6_lease(Some(public_ipv6));
|
||||
global_ctx.add_running_listener(listener);
|
||||
|
||||
let ip = std::net::IpAddr::V6(public_ipv6.address());
|
||||
let socket = SocketAddr::from((public_ipv6.address(), 11010));
|
||||
|
||||
assert!(global_ctx.is_ip_local_virtual_ip(&ip));
|
||||
assert!(global_ctx.should_deny_proxy(&socket, false));
|
||||
|
||||
protected_port::clear_protected_tcp_ports_for_test();
|
||||
}
|
||||
|
||||
pub fn get_mock_global_ctx_with_network(
|
||||
network_identy: Option<NetworkIdentity>,
|
||||
) -> ArcGlobalCtx {
|
||||
|
||||
@@ -720,7 +720,7 @@ async fn check_udp_socket_local_addr(
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").await?;
|
||||
socket.connect(remote_mapped_addr).await?;
|
||||
if let Ok(local_addr) = socket.local_addr() {
|
||||
// local_addr should not be equal to an EasyTier-managed virtual/public address.
|
||||
// local_addr should not be equal to virtual ipv4 or virtual ipv6
|
||||
match local_addr.ip() {
|
||||
IpAddr::V4(ip) => {
|
||||
if global_ctx.get_ipv4().map(|ip| ip.address()) == Some(ip) {
|
||||
@@ -728,8 +728,8 @@ async fn check_udp_socket_local_addr(
|
||||
}
|
||||
}
|
||||
IpAddr::V6(ip) => {
|
||||
if global_ctx.is_ip_local_ipv6(&ip) {
|
||||
return Err(anyhow::anyhow!("local address is easytier-managed ipv6").into());
|
||||
if global_ctx.get_ipv6().map(|ip| ip.address()) == Some(ip) {
|
||||
return Err(anyhow::anyhow!("local address is virtual ipv6").into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,8 +66,7 @@ use super::dns_server::{MAGIC_DNS_FAKE_IP, runner::DnsRunner};
|
||||
use super::listeners::ListenerManager;
|
||||
use super::public_ipv6_provider::{
|
||||
reconcile_public_ipv6_provider_runtime, run_public_ipv6_provider_reconcile_task,
|
||||
should_run_public_ipv6_provider_reconcile, validate_public_ipv6_config,
|
||||
validate_public_ipv6_config_values,
|
||||
validate_public_ipv6_config,
|
||||
};
|
||||
|
||||
#[cfg(feature = "socks5")]
|
||||
@@ -257,64 +256,11 @@ pub struct InstanceConfigPatcher {
|
||||
}
|
||||
|
||||
impl InstanceConfigPatcher {
|
||||
fn parse_ipv6_public_addr_prefix_patch(
|
||||
prefix: Option<&str>,
|
||||
) -> Result<Option<Option<cidr::Ipv6Cidr>>, anyhow::Error> {
|
||||
let Some(prefix) = prefix else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let prefix = prefix.trim();
|
||||
if prefix.is_empty() {
|
||||
return Ok(Some(None));
|
||||
}
|
||||
|
||||
let parsed = prefix
|
||||
.parse()
|
||||
.with_context(|| format!("failed to parse ipv6 public address prefix: {prefix}"))?;
|
||||
Ok(Some(Some(parsed)))
|
||||
}
|
||||
|
||||
fn effective_ipv6_for_public_ipv6_validation(
|
||||
global_ctx: &ArcGlobalCtx,
|
||||
patch: &crate::proto::api::config::InstanceConfigPatch,
|
||||
_auto_enabled: bool,
|
||||
) -> Option<cidr::Ipv6Inet> {
|
||||
if let Some(ipv6) = patch.ipv6 {
|
||||
return Some(ipv6.into());
|
||||
}
|
||||
|
||||
global_ctx.get_ipv6()
|
||||
}
|
||||
|
||||
fn validate_public_ipv6_patch(
|
||||
global_ctx: &ArcGlobalCtx,
|
||||
patch: &crate::proto::api::config::InstanceConfigPatch,
|
||||
) -> Result<Option<Option<cidr::Ipv6Cidr>>, anyhow::Error> {
|
||||
let parsed_prefix =
|
||||
Self::parse_ipv6_public_addr_prefix_patch(patch.ipv6_public_addr_prefix.as_deref())?;
|
||||
|
||||
let auto_enabled = patch
|
||||
.ipv6_public_addr_auto
|
||||
.unwrap_or(global_ctx.config.get_ipv6_public_addr_auto());
|
||||
let provider_enabled = patch
|
||||
.ipv6_public_addr_provider
|
||||
.unwrap_or(global_ctx.config.get_ipv6_public_addr_provider());
|
||||
let prefix =
|
||||
parsed_prefix.unwrap_or_else(|| global_ctx.config.get_ipv6_public_addr_prefix());
|
||||
let ipv6 = Self::effective_ipv6_for_public_ipv6_validation(global_ctx, patch, auto_enabled);
|
||||
|
||||
validate_public_ipv6_config_values(ipv6, provider_enabled, auto_enabled, prefix)?;
|
||||
Ok(parsed_prefix)
|
||||
}
|
||||
|
||||
pub async fn apply_patch(
|
||||
&self,
|
||||
patch: crate::proto::api::config::InstanceConfigPatch,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let patch_for_event = patch.clone();
|
||||
let global_ctx = weak_upgrade(&self.global_ctx)?;
|
||||
let parsed_ipv6_public_addr_prefix = Self::validate_public_ipv6_patch(&global_ctx, &patch)?;
|
||||
|
||||
self.patch_port_forwards(patch.port_forwards).await?;
|
||||
self.patch_acl(patch.acl).await?;
|
||||
@@ -324,7 +270,9 @@ impl InstanceConfigPatcher {
|
||||
self.patch_mapped_listeners(patch.mapped_listeners).await?;
|
||||
self.patch_connector(patch.connectors).await?;
|
||||
|
||||
let provider_reconcile_was_running = should_run_public_ipv6_provider_reconcile(&global_ctx);
|
||||
let global_ctx = weak_upgrade(&self.global_ctx)?;
|
||||
let provider_reconcile_was_running = global_ctx.config.get_ipv6_public_addr_provider()
|
||||
&& global_ctx.config.get_ipv6_public_addr_prefix().is_none();
|
||||
let mut provider_config_changed = false;
|
||||
if let Some(hostname) = patch.hostname {
|
||||
global_ctx.set_hostname(hostname.clone());
|
||||
@@ -347,8 +295,16 @@ impl InstanceConfigPatcher {
|
||||
if let Some(enabled) = patch.ipv6_public_addr_auto {
|
||||
global_ctx.config.set_ipv6_public_addr_auto(enabled);
|
||||
}
|
||||
if let Some(prefix) = parsed_ipv6_public_addr_prefix {
|
||||
global_ctx.config.set_ipv6_public_addr_prefix(prefix);
|
||||
if let Some(prefix) = patch.ipv6_public_addr_prefix {
|
||||
let prefix = prefix.trim();
|
||||
let parsed = if prefix.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(prefix.parse().with_context(|| {
|
||||
format!("failed to parse ipv6 public address prefix: {prefix}")
|
||||
})?)
|
||||
};
|
||||
global_ctx.config.set_ipv6_public_addr_prefix(parsed);
|
||||
provider_config_changed = true;
|
||||
}
|
||||
|
||||
@@ -357,8 +313,8 @@ impl InstanceConfigPatcher {
|
||||
if provider_config_changed {
|
||||
reconcile_public_ipv6_provider_runtime(&global_ctx).await;
|
||||
|
||||
let provider_reconcile_should_run =
|
||||
should_run_public_ipv6_provider_reconcile(&global_ctx);
|
||||
let provider_reconcile_should_run = global_ctx.config.get_ipv6_public_addr_provider()
|
||||
&& global_ctx.config.get_ipv6_public_addr_prefix().is_none();
|
||||
if !provider_reconcile_was_running && provider_reconcile_should_run {
|
||||
run_public_ipv6_provider_reconcile_task(&global_ctx);
|
||||
}
|
||||
@@ -1631,9 +1587,7 @@ impl Drop for Instance {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
common::global_ctx::tests::get_mock_global_ctx,
|
||||
instance::instance::{InstanceConfigPatcher, InstanceRpcServerHook},
|
||||
proto::{api::config::InstanceConfigPatch, rpc_impl::standalone::RpcServerHook},
|
||||
instance::instance::InstanceRpcServerHook, proto::rpc_impl::standalone::RpcServerHook,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1754,50 +1708,4 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn validate_public_ipv6_patch_rejects_non_global_prefix() {
|
||||
let global_ctx = get_mock_global_ctx();
|
||||
let patch = InstanceConfigPatch {
|
||||
ipv6_public_addr_provider: Some(true),
|
||||
ipv6_public_addr_prefix: Some("fd00::/64".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let err =
|
||||
InstanceConfigPatcher::validate_public_ipv6_patch(&global_ctx, &patch).unwrap_err();
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("not a valid global unicast IPv6 prefix")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn validate_public_ipv6_patch_allows_enabling_auto_with_manual_ipv6() {
|
||||
let global_ctx = get_mock_global_ctx();
|
||||
global_ctx.set_ipv6(Some("fd00::1/64".parse().unwrap()));
|
||||
|
||||
let patch = InstanceConfigPatch {
|
||||
ipv6_public_addr_auto: Some(true),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(InstanceConfigPatcher::validate_public_ipv6_patch(&global_ctx, &patch).is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn validate_public_ipv6_patch_ignores_runtime_auto_ipv6_cache() {
|
||||
let global_ctx = get_mock_global_ctx();
|
||||
global_ctx.config.set_ipv6_public_addr_auto(true);
|
||||
global_ctx.set_ipv6(Some("2001:db8::10/64".parse().unwrap()));
|
||||
|
||||
let patch = InstanceConfigPatch {
|
||||
ipv6_public_addr_provider: Some(true),
|
||||
ipv6_public_addr_prefix: Some("2001:db8:100::/64".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(InstanceConfigPatcher::validate_public_ipv6_patch(&global_ctx, &patch).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use cidr::{Ipv6Cidr, Ipv6Inet};
|
||||
use cidr::Ipv6Cidr;
|
||||
#[cfg(target_os = "linux")]
|
||||
use netlink_packet_route::route::{RouteAddress, RouteAttribute, RouteMessage, RouteType};
|
||||
|
||||
@@ -44,12 +44,6 @@ fn should_run_public_ipv6_provider_reconcile_task(
|
||||
config.provider_enabled && config.configured_prefix.is_none()
|
||||
}
|
||||
|
||||
pub(super) fn should_run_public_ipv6_provider_reconcile(global_ctx: &ArcGlobalCtx) -> bool {
|
||||
should_run_public_ipv6_provider_reconcile_task(read_public_ipv6_provider_config_snapshot(
|
||||
global_ctx,
|
||||
))
|
||||
}
|
||||
|
||||
fn is_global_routable_public_ipv6_prefix(prefix: Ipv6Cidr) -> bool {
|
||||
let addr = prefix.first_address();
|
||||
!addr.is_loopback()
|
||||
@@ -59,19 +53,21 @@ fn is_global_routable_public_ipv6_prefix(prefix: Ipv6Cidr) -> bool {
|
||||
&& !addr.is_unspecified()
|
||||
}
|
||||
|
||||
pub(super) fn validate_public_ipv6_config_values(
|
||||
_ipv6: Option<Ipv6Inet>,
|
||||
provider_enabled: bool,
|
||||
_auto_enabled: bool,
|
||||
prefix: Option<Ipv6Cidr>,
|
||||
) -> Result<(), Error> {
|
||||
if !provider_enabled {
|
||||
pub(super) fn validate_public_ipv6_config(global_ctx: &ArcGlobalCtx) -> Result<(), Error> {
|
||||
if global_ctx.config.get_ipv6_public_addr_auto() && global_ctx.get_ipv6().is_some() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"cannot use --ipv6-public-addr-auto together with a manually set --ipv6; pick one or the other"
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
if !global_ctx.config.get_ipv6_public_addr_provider() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ensure_public_ipv6_provider_supported()?;
|
||||
|
||||
if let Some(prefix) = prefix
|
||||
if let Some(prefix) = global_ctx.config.get_ipv6_public_addr_prefix()
|
||||
&& !is_global_routable_public_ipv6_prefix(prefix)
|
||||
{
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -84,15 +80,6 @@ pub(super) fn validate_public_ipv6_config_values(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn validate_public_ipv6_config(global_ctx: &ArcGlobalCtx) -> Result<(), Error> {
|
||||
validate_public_ipv6_config_values(
|
||||
global_ctx.get_ipv6(),
|
||||
global_ctx.config.get_ipv6_public_addr_provider(),
|
||||
global_ctx.config.get_ipv6_public_addr_auto(),
|
||||
global_ctx.config.get_ipv6_public_addr_prefix(),
|
||||
)
|
||||
}
|
||||
|
||||
fn ensure_public_ipv6_provider_supported() -> Result<(), Error> {
|
||||
if cfg!(target_os = "linux") {
|
||||
return Ok(());
|
||||
|
||||
@@ -920,7 +920,7 @@ impl NicCtx {
|
||||
}
|
||||
let src_ipv6 = ipv6.get_source();
|
||||
let dst_ipv6 = ipv6.get_destination();
|
||||
let is_local_src = mgr.get_global_ctx().is_ip_local_ipv6(&src_ipv6);
|
||||
let my_ipv6 = mgr.get_global_ctx().get_ipv6().map(|x| x.address());
|
||||
tracing::trace!(
|
||||
?ret,
|
||||
?src_ipv6,
|
||||
@@ -928,14 +928,14 @@ impl NicCtx {
|
||||
"[USER_PACKET] recv new packet from tun device and forward to peers."
|
||||
);
|
||||
|
||||
if src_ipv6.is_unicast_link_local() && !is_local_src {
|
||||
if src_ipv6.is_unicast_link_local() && Some(src_ipv6) != my_ipv6 {
|
||||
// do not route link local packet to other nodes unless the address is assigned by user
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: use zero-copy
|
||||
let send_ret = mgr
|
||||
.send_msg_by_ip(ret, IpAddr::V6(dst_ipv6), is_local_src)
|
||||
.send_msg_by_ip(ret, IpAddr::V6(dst_ipv6), Some(src_ipv6) == my_ipv6)
|
||||
.await;
|
||||
if send_ret.is_err() {
|
||||
tracing::trace!(?send_ret, "[USER_PACKET] send_msg failed")
|
||||
@@ -1356,9 +1356,9 @@ impl NicCtx {
|
||||
|
||||
self.run_proxy_cidrs_route_updater().await?;
|
||||
self.run_public_ipv6_route_updater().await?;
|
||||
// Keep the updater running so runtime config patches can enable auto mode
|
||||
// without recreating the NIC.
|
||||
self.run_public_ipv6_addr_updater().await?;
|
||||
if self.global_ctx.config.get_ipv6_public_addr_auto() {
|
||||
self.run_public_ipv6_addr_updater().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -292,33 +292,13 @@ impl AclFilter {
|
||||
processor.increment_stat(AclStatKey::PacketsTotal);
|
||||
}
|
||||
|
||||
fn classify_chain_type(
|
||||
is_in: bool,
|
||||
packet_info: &PacketInfo,
|
||||
my_ipv4: Option<Ipv4Addr>,
|
||||
is_local_ipv6: impl Fn(Ipv6Addr) -> bool,
|
||||
) -> ChainType {
|
||||
if !is_in {
|
||||
return ChainType::Outbound;
|
||||
}
|
||||
|
||||
let is_local_dst = packet_info.dst_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|
||||
|| matches!(packet_info.dst_ip, IpAddr::V6(dst) if is_local_ipv6(dst));
|
||||
|
||||
if is_local_dst {
|
||||
ChainType::Inbound
|
||||
} else {
|
||||
ChainType::Forward
|
||||
}
|
||||
}
|
||||
|
||||
/// Common ACL processing logic
|
||||
pub fn process_packet_with_acl(
|
||||
&self,
|
||||
packet: &ZCPacket,
|
||||
is_in: bool,
|
||||
my_ipv4: Option<Ipv4Addr>,
|
||||
is_local_ipv6: impl Fn(Ipv6Addr) -> bool,
|
||||
my_ipv6: Option<Ipv6Addr>,
|
||||
route: &(dyn super::route_trait::Route + Send + Sync + 'static),
|
||||
) -> bool {
|
||||
if !self.acl_enabled.load(Ordering::Relaxed) {
|
||||
@@ -343,7 +323,17 @@ impl AclFilter {
|
||||
}
|
||||
};
|
||||
|
||||
let chain_type = Self::classify_chain_type(is_in, &packet_info, my_ipv4, is_local_ipv6);
|
||||
let chain_type = if is_in {
|
||||
if packet_info.dst_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|
||||
|| packet_info.dst_ip == my_ipv6.unwrap_or(Ipv6Addr::UNSPECIFIED)
|
||||
{
|
||||
ChainType::Inbound
|
||||
} else {
|
||||
ChainType::Forward
|
||||
}
|
||||
} else {
|
||||
ChainType::Outbound
|
||||
};
|
||||
|
||||
// Get current processor atomically
|
||||
let processor = self.get_processor();
|
||||
@@ -394,55 +384,3 @@ impl AclFilter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
common::acl_processor::PacketInfo,
|
||||
proto::acl::{ChainType, Protocol},
|
||||
};
|
||||
|
||||
use super::AclFilter;
|
||||
|
||||
fn packet_info(dst_ip: IpAddr) -> PacketInfo {
|
||||
PacketInfo {
|
||||
src_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
|
||||
dst_ip,
|
||||
src_port: Some(1234),
|
||||
dst_port: Some(80),
|
||||
protocol: Protocol::Tcp,
|
||||
packet_size: 64,
|
||||
src_groups: Arc::new(Vec::new()),
|
||||
dst_groups: Arc::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_chain_type_treats_public_ipv6_lease_as_inbound() {
|
||||
let leased_ipv6 = Ipv6Addr::new(0x2001, 0xdb8, 0x100, 0, 0, 0, 0, 0x123);
|
||||
let packet_info = packet_info(IpAddr::V6(leased_ipv6));
|
||||
|
||||
let chain =
|
||||
AclFilter::classify_chain_type(true, &packet_info, None, |ip| ip == leased_ipv6);
|
||||
|
||||
assert_eq!(chain, ChainType::Inbound);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_chain_type_keeps_non_local_ipv6_as_forward() {
|
||||
let leased_ipv6 = Ipv6Addr::new(0x2001, 0xdb8, 0x100, 0, 0, 0, 0, 0x123);
|
||||
let packet_info = packet_info(IpAddr::V6(Ipv6Addr::new(
|
||||
0x2001, 0xdb8, 0xffff, 2, 0, 0, 0, 0x100,
|
||||
)));
|
||||
|
||||
let chain =
|
||||
AclFilter::classify_chain_type(true, &packet_info, None, |ip| ip == leased_ipv6);
|
||||
|
||||
assert_eq!(chain, ChainType::Forward);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1062,7 +1062,7 @@ impl PeerManager {
|
||||
&ret,
|
||||
true,
|
||||
global_ctx.get_ipv4().map(|x| x.address()),
|
||||
|dst| global_ctx.is_ip_local_ipv6(&dst),
|
||||
global_ctx.get_ipv6().map(|x| x.address()),
|
||||
&route,
|
||||
) {
|
||||
continue;
|
||||
@@ -1342,7 +1342,7 @@ impl PeerManager {
|
||||
data,
|
||||
false,
|
||||
None,
|
||||
|_| false,
|
||||
None,
|
||||
&self.get_route(),
|
||||
) {
|
||||
return false;
|
||||
@@ -1544,10 +1544,6 @@ impl PeerManager {
|
||||
dst_peers.extend(self.peers.list_routes().await.iter().map(|x| *x.key()));
|
||||
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv6(ipv6_addr).await {
|
||||
dst_peers.push(peer_id);
|
||||
} else if !ipv6_addr.is_unicast_link_local()
|
||||
&& let Some(peer_id) = self.get_route().get_public_ipv6_gateway_peer_id().await
|
||||
{
|
||||
dst_peers.push(peer_id);
|
||||
} else if !ipv6_addr.is_unicast_link_local() {
|
||||
// NOTE: never route link local address to exit node.
|
||||
for exit_node in self.exit_nodes.read().await.iter() {
|
||||
@@ -1678,7 +1674,7 @@ impl PeerManager {
|
||||
&& !self.global_ctx.is_ip_local_virtual_ip(&ip_addr)
|
||||
{
|
||||
// Keep the loop-prevention flags for proxy-induced self-delivery where
|
||||
// the destination is not this node's own EasyTier-managed IP.
|
||||
// the destination is not this node's own virtual IP.
|
||||
hdr.set_not_send_to_tun(true);
|
||||
hdr.set_no_proxy(true);
|
||||
}
|
||||
|
||||
@@ -3954,10 +3954,6 @@ impl Route for PeerRoute {
|
||||
self.public_ipv6_service.my_addr()
|
||||
}
|
||||
|
||||
async fn get_public_ipv6_gateway_peer_id(&self) -> Option<PeerId> {
|
||||
self.public_ipv6_service.provider_peer_id_for_client()
|
||||
}
|
||||
|
||||
async fn get_local_public_ipv6_info(
|
||||
&self,
|
||||
) -> crate::proto::api::instance::ListPublicIpv6InfoResponse {
|
||||
|
||||
@@ -41,10 +41,6 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
|
||||
let et_ipv6: crate::proto::common::Ipv6Addr = et_ipv6.address().into();
|
||||
ret.interface_ipv6s.retain(|x| *x != et_ipv6);
|
||||
}
|
||||
if let Some(public_ipv6) = self.global_ctx.get_public_ipv6_lease() {
|
||||
let public_ipv6: crate::proto::common::Ipv6Addr = public_ipv6.address().into();
|
||||
ret.interface_ipv6s.retain(|x| *x != public_ipv6);
|
||||
}
|
||||
tracing::trace!(
|
||||
"get_ip_list: public_ipv4: {:?}, public_ipv6: {:?}, listeners: {:?}",
|
||||
ret.public_ipv4,
|
||||
|
||||
@@ -226,7 +226,9 @@ impl PublicIpv6Service {
|
||||
if *cached_my_addr != my_addr {
|
||||
let old = *cached_my_addr;
|
||||
*cached_my_addr = my_addr;
|
||||
self.global_ctx.set_public_ipv6_lease(my_addr);
|
||||
if self.global_ctx.config.get_ipv6_public_addr_auto() {
|
||||
self.global_ctx.set_ipv6(my_addr);
|
||||
}
|
||||
self.global_ctx
|
||||
.issue_event(GlobalCtxEvent::PublicIpv6Changed(old, my_addr));
|
||||
}
|
||||
@@ -640,11 +642,6 @@ impl PublicIpv6Service {
|
||||
*self.my_addr_cache.lock().unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn provider_peer_id_for_client(&self) -> Option<PeerId> {
|
||||
self.current_client_state()
|
||||
.map(|state| state.provider.peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn local_provider_state(
|
||||
&self,
|
||||
) -> Option<(PublicIpv6Provider, Vec<PublicIpv6ProviderLease>)> {
|
||||
@@ -847,48 +844,12 @@ fn allocate_public_ipv6_leases(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::Ipv6Addr;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use cidr::{Ipv6Cidr, Ipv6Inet};
|
||||
use cidr::Ipv6Cidr;
|
||||
|
||||
use crate::{
|
||||
common::{PeerId, global_ctx::tests::get_mock_global_ctx},
|
||||
peers::peer_rpc::PeerRpcManager,
|
||||
};
|
||||
|
||||
use super::{
|
||||
PublicIpv6PeerRouteInfo, PublicIpv6RouteControl, PublicIpv6Service, PublicIpv6SyncTrigger,
|
||||
allocate_public_ipv6_leases,
|
||||
};
|
||||
|
||||
struct TestRouteControl {
|
||||
my_peer_id: PeerId,
|
||||
peers: Mutex<Vec<PublicIpv6PeerRouteInfo>>,
|
||||
}
|
||||
|
||||
impl PublicIpv6RouteControl for TestRouteControl {
|
||||
fn my_peer_id(&self) -> PeerId {
|
||||
self.my_peer_id
|
||||
}
|
||||
|
||||
fn peer_route_snapshot(&self) -> Vec<PublicIpv6PeerRouteInfo> {
|
||||
self.peers.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
fn publish_self_public_ipv6_lease(&self, _lease: Option<Ipv6Inet>) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
struct TestSyncTrigger;
|
||||
|
||||
impl PublicIpv6SyncTrigger for TestSyncTrigger {
|
||||
fn sync_now(&self, _reason: &str) {}
|
||||
}
|
||||
use super::{PublicIpv6PeerRouteInfo, PublicIpv6Service, allocate_public_ipv6_leases};
|
||||
|
||||
#[test]
|
||||
fn public_ipv6_lease_allocator_keeps_stable_addresses() {
|
||||
@@ -978,64 +939,4 @@ mod tests {
|
||||
|
||||
assert!(leases.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_runtime_clears_public_ipv6_lease_when_auto_is_disabled() {
|
||||
let global_ctx = get_mock_global_ctx();
|
||||
global_ctx.config.set_ipv6_public_addr_auto(false);
|
||||
|
||||
let virtual_addr = "fd00::1/64".parse().unwrap();
|
||||
let stale_addr = "2001:db8::123/64".parse().unwrap();
|
||||
global_ctx.set_ipv6(Some(virtual_addr));
|
||||
global_ctx.set_public_ipv6_lease(Some(stale_addr));
|
||||
|
||||
let service = Arc::new(PublicIpv6Service::new(
|
||||
global_ctx.clone(),
|
||||
std::sync::Weak::<PeerRpcManager>::new(),
|
||||
Arc::new(TestRouteControl {
|
||||
my_peer_id: 1,
|
||||
peers: Mutex::new(Vec::new()),
|
||||
}),
|
||||
Arc::new(TestSyncTrigger),
|
||||
));
|
||||
*service.my_addr_cache.lock().unwrap() = Some(stale_addr);
|
||||
|
||||
service.reconcile_runtime_from_snapshot(&[]);
|
||||
|
||||
assert_eq!(*service.my_addr_cache.lock().unwrap(), None);
|
||||
assert_eq!(global_ctx.get_ipv6(), Some(virtual_addr));
|
||||
assert_eq!(global_ctx.get_public_ipv6_lease(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reconcile_runtime_keeps_virtual_ipv6_when_public_lease_changes() {
|
||||
let global_ctx = get_mock_global_ctx();
|
||||
global_ctx.config.set_ipv6_public_addr_auto(true);
|
||||
|
||||
let virtual_addr = "fd00::1/64".parse().unwrap();
|
||||
let public_addr = "2001:db8::123/64".parse().unwrap();
|
||||
global_ctx.set_ipv6(Some(virtual_addr));
|
||||
|
||||
let service = Arc::new(PublicIpv6Service::new(
|
||||
global_ctx.clone(),
|
||||
std::sync::Weak::<PeerRpcManager>::new(),
|
||||
Arc::new(TestRouteControl {
|
||||
my_peer_id: 1,
|
||||
peers: Mutex::new(vec![PublicIpv6PeerRouteInfo {
|
||||
peer_id: 1,
|
||||
inst_id: Some(uuid::Uuid::from_u128(1)),
|
||||
is_provider: false,
|
||||
prefix: None,
|
||||
lease: Some(public_addr),
|
||||
reachable: true,
|
||||
}]),
|
||||
}),
|
||||
Arc::new(TestSyncTrigger),
|
||||
));
|
||||
|
||||
service.reconcile_runtime();
|
||||
|
||||
assert_eq!(global_ctx.get_ipv6(), Some(virtual_addr));
|
||||
assert_eq!(global_ctx.get_public_ipv6_lease(), Some(public_addr));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,10 +105,6 @@ pub trait Route {
|
||||
None
|
||||
}
|
||||
|
||||
async fn get_public_ipv6_gateway_peer_id(&self) -> Option<PeerId> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn get_local_public_ipv6_info(&self) -> ListPublicIpv6InfoResponse {
|
||||
ListPublicIpv6InfoResponse::default()
|
||||
}
|
||||
|
||||
@@ -3599,15 +3599,7 @@ pub async fn config_patch_test() {
|
||||
};
|
||||
use crate::tunnel::common::tests::_tunnel_pingpong_netns_with_timeout;
|
||||
|
||||
let insts = init_three_node_ex(
|
||||
"udp",
|
||||
|cfg| {
|
||||
cfg.set_ipv6(None);
|
||||
cfg
|
||||
},
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
let insts = init_three_node("udp").await;
|
||||
|
||||
check_route(
|
||||
"10.144.144.2/24",
|
||||
|
||||
Reference in New Issue
Block a user