Compare commits

...

3 Commits

Author SHA1 Message Date
fanyang 253597ed02 Address review comments 2026-05-03 12:47:12 +08:00
fanyang 6887d04233 fix: allow stateful ACL reverse traffic 2026-05-03 12:14:00 +08:00
fanyang 05657db371 test: reproduce stateful ACL reverse traffic drop 2026-05-03 12:10:54 +08:00
+134 -15
View File
@@ -39,6 +39,7 @@ pub struct RateLimitValue {
pub enum RuleId {
Priority(u32),
Stateful(u32),
StatefulReverse,
Default,
}
@@ -48,6 +49,7 @@ impl RuleId {
match self {
RuleId::Priority(p) => p.to_string(),
RuleId::Stateful(p) => format!("stateful-{}", p),
RuleId::StatefulReverse => "stateful-reverse".to_string(),
RuleId::Default => "default".to_string(),
}
}
@@ -482,20 +484,30 @@ impl AclProcessor {
stats
}
/// Process a packet through ACL rules - Now lock-free!
/// Process a packet through ACL rules.
pub fn process_packet(&self, packet_info: &PacketInfo, chain_type: ChainType) -> AclResult {
// Check cache first for performance
let cache_key = AclCacheKey::from_packet_info(packet_info, chain_type);
// If cache hit and can skip checks, return cached result
// If cache hit and can skip checks, return cached result. Cached drops may be
// overridden by a stateful reverse connection that was created after caching.
if let Some(mut cached) = self.rule_cache.get_mut(&cache_key) {
// Update last access time for LRU
cached.last_access = Instant::now();
self.increment_stat(AclStatKey::CacheHits);
if cached.acl_result.as_ref().map(|r| r.action) == Some(Action::Drop)
&& let Some(result) = self.check_reverse_connection(packet_info)
{
return result;
}
return self.process_packet_with_cache_entry(packet_info, &cached);
}
if let Some(result) = self.check_reverse_connection(packet_info) {
return result;
}
// Direct access to rules - no locks needed!
let rules = match chain_type {
ChainType::Inbound => &self.inbound_rules,
@@ -730,28 +742,68 @@ impl AclProcessor {
}
fn conn_track_key(&self, packet_info: &PacketInfo) -> String {
Self::make_conn_track_key(
packet_info.src_ip,
packet_info.src_port,
packet_info.dst_ip,
packet_info.dst_port,
)
}
fn reverse_conn_track_key(&self, packet_info: &PacketInfo) -> String {
Self::make_conn_track_key(
packet_info.dst_ip,
packet_info.dst_port,
packet_info.src_ip,
packet_info.src_port,
)
}
fn make_conn_track_key(
src_ip: IpAddr,
src_port: Option<u16>,
dst_ip: IpAddr,
dst_port: Option<u16>,
) -> String {
format!(
"{}:{}->{}:{}",
packet_info.src_ip,
packet_info.src_port.unwrap_or(0),
packet_info.dst_ip,
packet_info.dst_port.unwrap_or(0)
src_ip,
src_port.unwrap_or(0),
dst_ip,
dst_port.unwrap_or(0)
)
}
fn check_reverse_connection(&self, packet_info: &PacketInfo) -> Option<AclResult> {
let reverse_key = self.reverse_conn_track_key(packet_info);
let mut entry = self.conn_track.get_mut(&reverse_key)?;
Self::update_conn_track_entry(entry.value_mut(), packet_info);
Some(AclResult {
action: Action::Allow,
matched_rule: Some(RuleId::StatefulReverse),
should_log: false,
log_context: Some(AclLogContext::StatefulMatch {
src_ip: packet_info.src_ip,
dst_ip: packet_info.dst_ip,
}),
})
}
fn update_conn_track_entry(entry: &mut ConnTrackEntry, packet_info: &PacketInfo) {
entry.last_seen = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
entry.packet_count += 1;
entry.byte_count += packet_info.packet_size as u64;
entry.state = ConnState::Established as i32;
}
/// Check connection state for stateful rules
fn check_connection_state(&self, conn_track_key: &str, packet_info: &PacketInfo) {
self.conn_track
.entry(conn_track_key.to_string())
.and_modify(|x| {
x.last_seen = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
x.packet_count += 1;
x.byte_count += packet_info.packet_size as u64;
x.state = ConnState::Established as i32;
})
.and_modify(|x| Self::update_conn_track_entry(x, packet_info))
.or_insert_with(|| ConnTrackEntry {
src_addr: Some(
SocketAddr::new(packet_info.src_ip, packet_info.src_port.unwrap_or(0)).into(),
@@ -1382,6 +1434,73 @@ mod tests {
}
}
#[test]
fn test_stateful_allows_reverse_traffic_before_default_drop() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let _runtime_guard = runtime.enter();
let mut acl_config = Acl::default();
let mut acl_v1 = AclV1::default();
let mut outbound_chain = Chain {
name: "outbound_stateful".to_string(),
chain_type: ChainType::Outbound as i32,
enabled: true,
default_action: Action::Drop as i32,
..Default::default()
};
outbound_chain.rules.push(Rule {
name: "allow_out_stateful".to_string(),
priority: 100,
enabled: true,
action: Action::Allow as i32,
protocol: Protocol::Tcp as i32,
stateful: true,
..Default::default()
});
let inbound_chain = Chain {
name: "inbound_default_drop".to_string(),
chain_type: ChainType::Inbound as i32,
enabled: true,
default_action: Action::Drop as i32,
..Default::default()
};
acl_v1.chains.push(outbound_chain);
acl_v1.chains.push(inbound_chain);
acl_config.acl_v1 = Some(acl_v1);
let processor = AclProcessor::new(acl_config);
let outbound_packet = PacketInfo {
src_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
dst_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
src_port: Some(12345),
dst_port: Some(22),
protocol: Protocol::Tcp,
packet_size: 64,
src_groups: Arc::new(vec![]),
dst_groups: Arc::new(vec![]),
};
let inbound_reply = PacketInfo {
src_ip: outbound_packet.dst_ip,
dst_ip: outbound_packet.src_ip,
src_port: outbound_packet.dst_port,
dst_port: outbound_packet.src_port,
protocol: outbound_packet.protocol,
packet_size: 64,
src_groups: Arc::new(vec![]),
dst_groups: Arc::new(vec![]),
};
let outbound_result = processor.process_packet(&outbound_packet, ChainType::Outbound);
assert_eq!(outbound_result.action, Action::Allow);
let inbound_result = processor.process_packet(&inbound_reply, ChainType::Inbound);
assert_eq!(inbound_result.action, Action::Allow);
assert_eq!(inbound_result.matched_rule, Some(RuleId::StatefulReverse));
}
#[test]
fn test_acl_cache_key_creation() {
let packet_info = create_test_packet_info();