Implement ACL (#1140)

1. get acl stats
```
./easytier-cli acl stats
AclStats:
  Global:
    CacheHits: 4
    CacheMaxSize: 10000
    CacheSize: 5
    DefaultAllows: 3
    InboundPacketsAllowed: 2
    InboundPacketsTotal: 2
    OutboundPacketsAllowed: 7
    OutboundPacketsTotal: 7
    PacketsAllowed: 9
    PacketsTotal: 9
    RuleMatches: 2
  ConnTrack:
    [src: 10.14.11.1:57444, dst: 10.14.11.2:1000, proto: Tcp, state: New, pkts: 1, bytes: 60, created: 2025-07-24 10:13:39 +08:00, last_seen: 2025-07-24 10:13:39 +08:00]
  Rules:
    [name: 'tcp_whitelist', prio: 1000, action: Allow, enabled: true, proto: Tcp, ports: ["1000"], src_ports: [], src_ips: [], dst_ips: [], stateful: true, rate: 0, burst: 0] [pkts: 2, bytes: 120]

  ```
2. use tcp/udp whitelist to block unexpected traffic.
   `sudo ./easytier-core -d --tcp-whitelist 1000`

3. use complete acl ability with config file:

```
[[acl.acl_v1.chains]]
name = "inbound_whitelist"
chain_type = 1
description = "Auto-generated inbound whitelist from CLI"
enabled = true
default_action = 2

[[acl.acl_v1.chains.rules]]
name = "tcp_whitelist"
description = "Auto-generated TCP whitelist rule"
priority = 1000
enabled = true
protocol = 1
ports = ["1000"]
source_ips = []
destination_ips = []
source_ports = []
action = 1
rate_limit = 0
burst_limit = 0
stateful = true

```
This commit is contained in:
Sijie.Sun
2025-07-24 22:13:45 +08:00
committed by GitHub
parent 4f53fccd25
commit 8e7a8de5e5
22 changed files with 2377 additions and 28 deletions
+289
View File
@@ -0,0 +1,289 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::atomic::Ordering;
use std::{
net::IpAddr,
sync::{atomic::AtomicBool, Arc},
};
use arc_swap::ArcSwap;
use pnet::packet::ipv6::Ipv6Packet;
use pnet::packet::{
ip::IpNextHeaderProtocols, ipv4::Ipv4Packet, tcp::TcpPacket, udp::UdpPacket, Packet as _,
};
use crate::proto::acl::{AclStats, Protocol};
use crate::tunnel::packet_def::PacketType;
use crate::{
common::acl_processor::{AclProcessor, AclResult, AclStatKey, AclStatType, PacketInfo},
proto::acl::{Acl, Action, ChainType},
tunnel::packet_def::ZCPacket,
};
/// ACL filter that can be inserted into the packet processing pipeline
/// Optimized with lock-free hot reloading via atomic processor replacement
pub struct AclFilter {
// Use ArcSwap for lock-free atomic replacement during hot reload
acl_processor: ArcSwap<AclProcessor>,
acl_enabled: Arc<AtomicBool>,
}
impl AclFilter {
pub fn new() -> Self {
Self {
acl_processor: ArcSwap::from(Arc::new(AclProcessor::new(Acl::default()))),
acl_enabled: Arc::new(AtomicBool::new(false)),
}
}
/// Hot reload ACL rules by creating a new processor instance
/// Preserves connection tracking and rate limiting state across reloads
/// Now lock-free and doesn't require &mut self!
pub fn reload_rules(&self, acl_config: Option<&Acl>) {
let Some(acl_config) = acl_config else {
self.acl_enabled.store(false, Ordering::Relaxed);
return;
};
// Get current processor to extract shared state
let current_processor = self.acl_processor.load();
let (conn_track, rate_limiters, stats) = current_processor.get_shared_state();
// Create new processor with preserved state
let new_processor = AclProcessor::new_with_shared_state(
acl_config.clone(),
Some(conn_track),
Some(rate_limiters),
Some(stats),
);
// Atomic replacement - this is completely lock-free!
self.acl_processor.store(Arc::new(new_processor));
self.acl_enabled.store(true, Ordering::Relaxed);
tracing::info!("ACL rules hot reloaded with preserved state (lock-free)");
}
/// Get current processor for processing packets
fn get_processor(&self) -> Arc<AclProcessor> {
self.acl_processor.load_full()
}
pub fn get_stats(&self) -> AclStats {
let processor = self.get_processor();
let global_stats = processor.get_stats();
let (conn_track, _, _) = processor.get_shared_state();
let rules_stats = processor.get_rules_stats();
AclStats {
global: global_stats.into_iter().map(|(k, v)| (k, v)).collect(),
conn_track: conn_track.iter().map(|x| x.value().clone()).collect(),
rules: rules_stats,
}
}
/// Extract packet information for ACL processing
fn extract_packet_info(&self, packet: &ZCPacket) -> Option<PacketInfo> {
let payload = packet.payload();
let src_ip;
let dst_ip;
let src_port;
let dst_port;
let protocol;
let ipv4_packet = Ipv4Packet::new(payload)?;
if ipv4_packet.get_version() == 4 {
src_ip = IpAddr::V4(ipv4_packet.get_source());
dst_ip = IpAddr::V4(ipv4_packet.get_destination());
protocol = ipv4_packet.get_next_level_protocol();
(src_port, dst_port) = match protocol {
IpNextHeaderProtocols::Tcp => {
let tcp_packet = TcpPacket::new(ipv4_packet.payload())?;
(
Some(tcp_packet.get_source()),
Some(tcp_packet.get_destination()),
)
}
IpNextHeaderProtocols::Udp => {
let udp_packet = UdpPacket::new(ipv4_packet.payload())?;
(
Some(udp_packet.get_source()),
Some(udp_packet.get_destination()),
)
}
_ => (None, None),
};
} else if ipv4_packet.get_version() == 6 {
let ipv6_packet = Ipv6Packet::new(payload)?;
src_ip = IpAddr::V6(ipv6_packet.get_source());
dst_ip = IpAddr::V6(ipv6_packet.get_destination());
protocol = ipv6_packet.get_next_header();
(src_port, dst_port) = match protocol {
IpNextHeaderProtocols::Tcp => {
let tcp_packet = TcpPacket::new(ipv6_packet.payload())?;
(
Some(tcp_packet.get_source()),
Some(tcp_packet.get_destination()),
)
}
IpNextHeaderProtocols::Udp => {
let udp_packet = UdpPacket::new(ipv6_packet.payload())?;
(
Some(udp_packet.get_source()),
Some(udp_packet.get_destination()),
)
}
_ => (None, None),
};
} else {
return None;
}
let acl_protocol = match protocol {
IpNextHeaderProtocols::Tcp => Protocol::Tcp,
IpNextHeaderProtocols::Udp => Protocol::Udp,
IpNextHeaderProtocols::Icmp => Protocol::Icmp,
IpNextHeaderProtocols::Icmpv6 => Protocol::IcmPv6,
_ => Protocol::Unspecified,
};
Some(PacketInfo {
src_ip,
dst_ip,
src_port,
dst_port,
protocol: acl_protocol,
packet_size: payload.len(),
})
}
/// Process ACL result and log if needed
fn handle_acl_result(
&self,
result: &AclResult,
packet_info: &PacketInfo,
chain_type: ChainType,
processor: &AclProcessor,
) {
if result.should_log {
if let Some(ref log_context) = result.log_context {
let log_message = log_context.to_message();
tracing::info!(
src_ip = %packet_info.src_ip,
dst_ip = %packet_info.dst_ip,
src_port = packet_info.src_port,
dst_port = packet_info.dst_port,
protocol = ?packet_info.protocol,
action = ?result.action,
rule = result.matched_rule_str().as_deref().unwrap_or("unknown"),
chain_type = ?chain_type,
"ACL: {}", log_message
);
}
}
// Update global statistics in the ACL processor
match result.action {
Action::Allow => {
processor.increment_stat(AclStatKey::PacketsAllowed);
processor.increment_stat(AclStatKey::from_chain_and_action(
chain_type,
AclStatType::Allowed,
));
tracing::trace!("ACL: Packet allowed");
}
Action::Drop => {
processor.increment_stat(AclStatKey::PacketsDropped);
processor.increment_stat(AclStatKey::from_chain_and_action(
chain_type,
AclStatType::Dropped,
));
tracing::debug!("ACL: Packet dropped");
}
Action::Noop => {
processor.increment_stat(AclStatKey::PacketsNoop);
processor.increment_stat(AclStatKey::from_chain_and_action(
chain_type,
AclStatType::Noop,
));
tracing::trace!("ACL: No operation");
}
}
// Track total packets processed per chain
processor.increment_stat(AclStatKey::from_chain_and_action(
chain_type,
AclStatType::Total,
));
processor.increment_stat(AclStatKey::PacketsTotal);
}
/// Common ACL processing logic
pub fn process_packet_with_acl(
&self,
packet: &ZCPacket,
is_in: bool,
my_ipv4: Option<Ipv4Addr>,
my_ipv6: Option<Ipv6Addr>,
) -> bool {
if !self.acl_enabled.load(Ordering::Relaxed) {
return true;
}
if packet.peer_manager_header().unwrap().packet_type != PacketType::Data as u8 {
return true;
}
// Extract packet information
let packet_info = match self.extract_packet_info(packet) {
Some(info) => info,
None => {
tracing::warn!(
"Failed to extract packet info from {:?} packet, header: {:?}",
if is_in { "inbound" } else { "outbound" },
packet.peer_manager_header()
);
// allow all unknown packets
return true;
}
};
let chain_type = if is_in {
if packet_info.dst_ip == my_ipv4.unwrap_or(Ipv4Addr::UNSPECIFIED)
|| packet_info.dst_ip == my_ipv6.unwrap_or(Ipv6Addr::UNSPECIFIED)
{
ChainType::Inbound
} else {
ChainType::Forward
}
} else {
ChainType::Outbound
};
// Get current processor atomically
let processor = self.get_processor();
// Process through ACL rules
let acl_result = processor.process_packet(&packet_info, chain_type);
self.handle_acl_result(&acl_result, &packet_info, chain_type, &processor);
// Check if packet should be allowed
match acl_result.action {
Action::Allow | Action::Noop => true,
Action::Drop => {
tracing::trace!(
"ACL: Dropping {:?} packet from {} to {}, chain_type: {:?}",
packet_info.protocol,
packet_info.src_ip,
packet_info.dst_ip,
chain_type,
);
false
}
}
}
}
+1
View File
@@ -1,5 +1,6 @@
mod graph_algo;
pub mod acl_filter;
pub mod peer;
// pub mod peer_conn;
pub mod peer_conn;
+19
View File
@@ -573,6 +573,8 @@ impl PeerManager {
let foreign_mgr = self.foreign_network_manager.clone();
let encryptor = self.encryptor.clone();
let compress_algo = self.data_compress_algo;
let acl_filter = self.global_ctx.get_acl_filter().clone();
let global_ctx = self.global_ctx.clone();
self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv");
while let Ok(ret) = recv_packet_from_chan(&mut recv).await {
@@ -631,6 +633,15 @@ impl PeerManager {
continue;
}
if !acl_filter.process_packet_with_acl(
&ret,
true,
global_ctx.get_ipv4().map(|x| x.address()),
global_ctx.get_ipv6().map(|x| x.address()),
) {
continue;
}
let mut processed = false;
let mut zc_packet = Some(ret);
let mut idx = 0;
@@ -845,6 +856,14 @@ impl PeerManager {
}
async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) {
if !self
.global_ctx
.get_acl_filter()
.process_packet_with_acl(data, false, None, None)
{
return;
}
for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() {
let _ = pipeline.try_process_packet_from_nic(data).await;
}
+24 -4
View File
@@ -2,10 +2,10 @@ use std::sync::Arc;
use crate::proto::{
cli::{
DumpRouteRequest, DumpRouteResponse, ListForeignNetworkRequest, ListForeignNetworkResponse,
ListGlobalForeignNetworkRequest, ListGlobalForeignNetworkResponse, ListPeerRequest,
ListPeerResponse, ListRouteRequest, ListRouteResponse, PeerInfo, PeerManageRpc,
ShowNodeInfoRequest, ShowNodeInfoResponse,
AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest, GetAclStatsResponse,
ListForeignNetworkRequest, ListForeignNetworkResponse, ListGlobalForeignNetworkRequest,
ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest,
ListRouteResponse, PeerInfo, PeerManageRpc, ShowNodeInfoRequest, ShowNodeInfoResponse,
},
rpc_types::{self, controller::BaseController},
};
@@ -134,3 +134,23 @@ impl PeerManageRpc for PeerManagerRpcService {
})
}
}
#[async_trait::async_trait]
impl AclManageRpc for PeerManagerRpcService {
type Controller = BaseController;
async fn get_acl_stats(
&self,
_: BaseController,
_request: GetAclStatsRequest,
) -> Result<GetAclStatsResponse, rpc_types::error::Error> {
let acl_stats = self
.peer_manager
.get_global_ctx()
.get_acl_filter()
.get_stats();
Ok(GetAclStatsResponse {
acl_stats: Some(acl_stats),
})
}
}