feat(acl): add group-based ACL rules and related structures (#1265)

* feat(acl): add group-based ACL rules and related structures

* refactor(acl): optimize group handling with Arc and improve cache management

* refactor(acl): clippy

* feat(tests): add performance tests for generate_with_proof and verify methods

* feat: update group_trust_map to use HashMap for more secure group proofs

* refactor: refactor the logic of the trusted group getting and setting

* feat(acl): support kcp/quic use group acl

* feat(proxy): optimize group retrieval by IP in Kcp and Quic proxy handlers

* feat(tests): add group-based ACL tree node test

* always allow quic proxy traffic

---------

Co-authored-by: Sijie.Sun <sunsijie@buaa.edu.cn>
Co-authored-by: sijie.sun <sijie.sun@smartx.com>
This commit is contained in:
Mg Pig
2025-08-22 22:25:00 +08:00
committed by GitHub
parent 34560af141
commit 08a92a53c3
18 changed files with 1042 additions and 29 deletions
+69 -3
View File
@@ -1,5 +1,5 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU16, Ordering};
use std::{
net::IpAddr,
sync::{atomic::AtomicBool, Arc},
@@ -25,6 +25,7 @@ pub struct AclFilter {
// Use ArcSwap for lock-free atomic replacement during hot reload
acl_processor: ArcSwap<AclProcessor>,
acl_enabled: Arc<AtomicBool>,
quic_udp_port: AtomicU16,
}
impl Default for AclFilter {
@@ -38,6 +39,7 @@ impl AclFilter {
Self {
acl_processor: ArcSwap::from(Arc::new(AclProcessor::new(Acl::default()))),
acl_enabled: Arc::new(AtomicBool::new(false)),
quic_udp_port: AtomicU16::new(0),
}
}
@@ -88,7 +90,11 @@ impl AclFilter {
}
/// Extract packet information for ACL processing
fn extract_packet_info(&self, packet: &ZCPacket) -> Option<PacketInfo> {
fn extract_packet_info(
&self,
packet: &ZCPacket,
route: &(dyn super::route_trait::Route + Send + Sync + 'static),
) -> Option<PacketInfo> {
let payload = packet.payload();
let src_ip;
@@ -155,6 +161,15 @@ impl AclFilter {
_ => Protocol::Unspecified,
};
let src_groups = packet
.get_src_peer_id()
.map(|peer_id| route.get_peer_groups(peer_id))
.unwrap_or_else(|| Arc::new(Vec::new()));
let dst_groups = packet
.get_dst_peer_id()
.map(|peer_id| route.get_peer_groups(peer_id))
.unwrap_or_else(|| Arc::new(Vec::new()));
Some(PacketInfo {
src_ip,
dst_ip,
@@ -162,6 +177,8 @@ impl AclFilter {
dst_port,
protocol: acl_protocol,
packet_size: payload.len(),
src_groups,
dst_groups,
})
}
@@ -181,6 +198,8 @@ impl AclFilter {
dst_ip = %packet_info.dst_ip,
src_port = packet_info.src_port,
dst_port = packet_info.dst_port,
src_group = packet_info.src_groups.join(","),
dst_group = packet_info.dst_groups.join(","),
protocol = ?packet_info.protocol,
action = ?result.action,
rule = result.matched_rule_str().as_deref().unwrap_or("unknown"),
@@ -226,6 +245,40 @@ impl AclFilter {
processor.increment_stat(AclStatKey::PacketsTotal);
}
fn check_is_quic_packet(
&self,
packet_info: &PacketInfo,
my_ipv4: &Option<Ipv4Addr>,
my_ipv6: &Option<Ipv6Addr>,
) -> bool {
if packet_info.protocol != Protocol::Udp {
return false;
}
let quic_port = self.get_quic_udp_port();
if quic_port == 0 {
return false;
}
// quic input
if packet_info.dst_port == Some(quic_port)
&& (packet_info.dst_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|| packet_info.dst_ip == my_ipv6.unwrap_or(Ipv6Addr::UNSPECIFIED))
{
return true;
}
// quic output
if packet_info.src_port == Some(quic_port)
&& (packet_info.src_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|| packet_info.src_ip == my_ipv6.unwrap_or(Ipv6Addr::UNSPECIFIED))
{
return true;
}
false
}
/// Common ACL processing logic
pub fn process_packet_with_acl(
&self,
@@ -233,6 +286,7 @@ impl AclFilter {
is_in: bool,
my_ipv4: Option<Ipv4Addr>,
my_ipv6: Option<Ipv6Addr>,
route: &(dyn super::route_trait::Route + Send + Sync + 'static),
) -> bool {
if !self.acl_enabled.load(Ordering::Relaxed) {
return true;
@@ -243,7 +297,7 @@ impl AclFilter {
}
// Extract packet information
let packet_info = match self.extract_packet_info(packet) {
let packet_info = match self.extract_packet_info(packet, route) {
Some(info) => info,
None => {
tracing::warn!(
@@ -256,6 +310,10 @@ impl AclFilter {
}
};
if self.check_is_quic_packet(&packet_info, &my_ipv4, &my_ipv6) {
return true;
}
let chain_type = if is_in {
if packet_info.dst_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|| packet_info.dst_ip == my_ipv6.unwrap_or(Ipv6Addr::UNSPECIFIED)
@@ -292,4 +350,12 @@ impl AclFilter {
}
}
}
pub fn get_quic_udp_port(&self) -> u16 {
self.quic_udp_port.load(Ordering::Relaxed)
}
pub fn set_quic_udp_port(&self, port: u16) {
self.quic_udp_port.store(port, Ordering::Relaxed);
}
}
+11 -7
View File
@@ -32,7 +32,7 @@ use crate::{
peer_conn::PeerConn,
peer_rpc::PeerRpcManagerTransport,
recv_packet_from_chan,
route_trait::{ForeignNetworkRouteInfoMap, NextHopPolicy, RouteInterface},
route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface},
PeerPacketFilter,
},
proto::{
@@ -634,6 +634,7 @@ impl PeerManager {
let acl_filter = self.global_ctx.get_acl_filter().clone();
let global_ctx = self.global_ctx.clone();
let stats_mgr = self.global_ctx.stats_manager().clone();
let route = self.get_route();
let label_set =
LabelSet::new().with_label_type(LabelType::NetworkName(global_ctx.get_network_name()));
@@ -737,6 +738,7 @@ impl PeerManager {
true,
global_ctx.get_ipv4().map(|x| x.address()),
global_ctx.get_ipv6().map(|x| x.address()),
&route,
) {
continue;
}
@@ -914,7 +916,7 @@ impl PeerManager {
pub fn get_route(&self) -> Box<dyn Route + Send + Sync + 'static> {
match &self.route_algo_inst {
RouteAlgoInst::Ospf(route) => Box::new(route.clone()),
RouteAlgoInst::None => panic!("no route"),
RouteAlgoInst::None => Box::new(MockRoute {}),
}
}
@@ -960,11 +962,13 @@ impl PeerManager {
}
async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) {
if !self
.global_ctx
.get_acl_filter()
.process_packet_with_acl(data, false, None, None)
{
if !self.global_ctx.get_acl_filter().process_packet_with_acl(
data,
false,
None,
None,
&self.get_route(),
) {
return;
}
+111 -1
View File
@@ -1,5 +1,7 @@
use std::{
collections::{BTreeMap, BTreeSet},
collections::{
HashMap, {BTreeMap, BTreeSet},
},
fmt::Debug,
net::{Ipv4Addr, Ipv6Addr},
sync::{
@@ -33,6 +35,7 @@ use crate::{
},
peers::route_trait::{Route, RouteInterfaceBox},
proto::{
acl::GroupIdentity,
common::{Ipv4Inet, NatType, StunInfo},
peer_rpc::{
route_foreign_network_infos, route_foreign_network_summary,
@@ -127,6 +130,7 @@ impl RoutePeerInfo {
network_length: 24,
quic_port: None,
ipv6_addr: None,
groups: Vec::new(),
}
}
@@ -168,6 +172,8 @@ impl RoutePeerInfo {
quic_port: global_ctx.get_quic_proxy_port().map(|x| x as u32),
ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()),
groups: global_ctx.get_acl_groups(my_peer_id),
};
let need_update_periodically = if let Ok(Ok(d)) =
@@ -296,6 +302,8 @@ struct SyncedRouteInfo {
raw_peer_infos: DashMap<PeerId, DynamicMessage>,
conn_map: DashMap<PeerId, (BTreeSet<PeerId>, AtomicVersion)>,
foreign_network: DashMap<ForeignNetworkRouteInfoKey, ForeignNetworkRouteInfoEntry>,
group_trust_map: DashMap<PeerId, HashMap<String, Vec<u8>>>,
group_trust_map_cache: DashMap<PeerId, Arc<Vec<String>>>, // cache for group trust map, should sync with group_trust_map
version: AtomicVersion,
}
@@ -306,6 +314,7 @@ impl Debug for SyncedRouteInfo {
.field("peer_infos", &self.peer_infos)
.field("conn_map", &self.conn_map)
.field("foreign_network", &self.foreign_network)
.field("group_trust_map", &self.group_trust_map)
.field("version", &self.version.get())
.finish()
}
@@ -324,6 +333,8 @@ impl SyncedRouteInfo {
self.raw_peer_infos.remove(&peer_id);
self.conn_map.remove(&peer_id);
self.foreign_network.retain(|k, _| k.peer_id != peer_id);
self.group_trust_map.remove(&peer_id);
self.group_trust_map_cache.remove(&peer_id);
self.version.inc();
}
@@ -613,6 +624,85 @@ impl SyncedRouteInfo {
self.is_peer_bidirectly_connected(src_peer_id, dst_peer_id)
|| self.is_peer_bidirectly_connected(dst_peer_id, src_peer_id)
}
fn verify_and_update_group_trusts(
&self,
peer_infos: &[RoutePeerInfo],
local_group_declarations: &[GroupIdentity],
) {
let local_group_declarations = local_group_declarations
.iter()
.map(|g| (g.group_name.as_str(), g.group_secret.as_str()))
.collect::<std::collections::HashMap<&str, &str>>();
let verify_groups = |old_trusted_groups: Option<&HashMap<String, Vec<u8>>>,
info: &RoutePeerInfo|
-> HashMap<String, Vec<u8>> {
let mut trusted_groups_for_peer: HashMap<String, Vec<u8>> = HashMap::new();
for group_proof in &info.groups {
let name = &group_proof.group_name;
let proof_bytes = group_proof.group_proof.clone();
// If we already trusted this group and the proof hasn't changed, reuse it.
if old_trusted_groups
.and_then(|g| g.get(name))
.map(|old| old == &proof_bytes)
.unwrap_or(false)
{
trusted_groups_for_peer.insert(name.clone(), proof_bytes);
continue;
}
if let Some(&local_secret) =
local_group_declarations.get(group_proof.group_name.as_str())
{
if group_proof.verify(local_secret, info.peer_id) {
trusted_groups_for_peer.insert(name.clone(), proof_bytes);
} else {
tracing::warn!(
peer_id = info.peer_id,
group = %group_proof.group_name,
"Group proof verification failed"
);
}
}
}
trusted_groups_for_peer
};
for info in peer_infos {
match self.group_trust_map.entry(info.peer_id) {
dashmap::mapref::entry::Entry::Occupied(mut entry) => {
let old_trusted_groups = entry.get().clone();
let trusted_groups_for_peer = verify_groups(Some(&old_trusted_groups), info);
if trusted_groups_for_peer.is_empty() {
entry.remove();
self.group_trust_map_cache.remove(&info.peer_id);
} else {
self.group_trust_map_cache.insert(
info.peer_id,
Arc::new(trusted_groups_for_peer.keys().cloned().collect()),
);
*entry.get_mut() = trusted_groups_for_peer;
}
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
let trusted_groups_for_peer = verify_groups(None, info);
if !trusted_groups_for_peer.is_empty() {
self.group_trust_map_cache.insert(
info.peer_id,
Arc::new(trusted_groups_for_peer.keys().cloned().collect()),
);
entry.insert(trusted_groups_for_peer);
}
}
}
}
}
}
type PeerGraph = Graph<PeerId, usize, Directed>;
@@ -1154,6 +1244,8 @@ impl PeerRouteServiceImpl {
raw_peer_infos: DashMap::new(),
conn_map: DashMap::new(),
foreign_network: DashMap::new(),
group_trust_map: DashMap::new(),
group_trust_map_cache: DashMap::new(),
version: AtomicVersion::new(),
},
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()),
@@ -1679,6 +1771,14 @@ impl PeerRouteServiceImpl {
fn get_peer_info_last_update(&self) -> std::time::Instant {
self.peer_info_last_update.load()
}
fn get_peer_groups(&self, peer_id: PeerId) -> Arc<Vec<String>> {
self.synced_route_info
.group_trust_map_cache
.get(&peer_id)
.map(|groups| groups.value().clone())
.unwrap_or_default()
}
}
impl Drop for PeerRouteServiceImpl {
@@ -2016,6 +2116,12 @@ impl RouteSessionManager {
peer_infos,
raw_peer_infos.as_ref().unwrap(),
)?;
service_impl
.synced_route_info
.verify_and_update_group_trusts(
peer_infos,
&service_impl.global_ctx.get_acl_group_declarations(),
);
session.update_dst_saved_peer_info_version(peer_infos);
need_update_route_table = true;
}
@@ -2364,6 +2470,10 @@ impl Route for PeerRoute {
async fn get_peer_info_last_update_time(&self) -> Instant {
self.service_impl.get_peer_info_last_update()
}
fn get_peer_groups(&self, peer_id: PeerId) -> Arc<Vec<String>> {
self.service_impl.get_peer_groups(peer_id)
}
}
impl PeerPacketFilter for Arc<PeerRoute> {}
+49
View File
@@ -122,9 +122,58 @@ pub trait Route {
async fn get_peer_info_last_update_time(&self) -> std::time::Instant;
fn get_peer_groups(&self, peer_id: PeerId) -> Arc<Vec<String>>;
async fn get_peer_groups_by_ip(&self, ip: &std::net::IpAddr) -> Arc<Vec<String>> {
match self.get_peer_id_by_ip(ip).await {
Some(peer_id) => self.get_peer_groups(peer_id),
None => Arc::new(Vec::new()),
}
}
async fn get_peer_groups_by_ipv4(&self, ipv4: &Ipv4Addr) -> Arc<Vec<String>> {
match self.get_peer_id_by_ipv4(ipv4).await {
Some(peer_id) => self.get_peer_groups(peer_id),
None => Arc::new(Vec::new()),
}
}
async fn dump(&self) -> String {
"this route implementation does not support dump".to_string()
}
}
pub type ArcRoute = Arc<Box<dyn Route + Send + Sync>>;
pub struct MockRoute {}
#[async_trait::async_trait]
impl Route for MockRoute {
async fn open(&self, _interface: RouteInterfaceBox) -> Result<u8, ()> {
panic!("mock route")
}
async fn close(&self) {
panic!("mock route")
}
async fn get_next_hop(&self, _peer_id: PeerId) -> Option<PeerId> {
panic!("mock route")
}
async fn list_routes(&self) -> Vec<crate::proto::cli::Route> {
panic!("mock route")
}
async fn get_peer_info(&self, _peer_id: PeerId) -> Option<RoutePeerInfo> {
panic!("mock route")
}
async fn get_peer_info_last_update_time(&self) -> std::time::Instant {
panic!("mock route")
}
fn get_peer_groups(&self, _peer_id: PeerId) -> Arc<Vec<String>> {
panic!("mock route")
}
}