fix acl not work with kcp&quic (#1152)

This commit is contained in:
Sijie.Sun
2025-07-26 14:38:10 +08:00
committed by GitHub
parent 5409c5bbe7
commit 354a4e1d7b
6 changed files with 380 additions and 24 deletions
+1
View File
@@ -470,6 +470,7 @@ impl AclProcessor {
let rules = match chain_type { let rules = match chain_type {
ChainType::Inbound => &self.inbound_rules, ChainType::Inbound => &self.inbound_rules,
ChainType::Outbound => &self.outbound_rules, ChainType::Outbound => &self.outbound_rules,
ChainType::Forward => &self.forward_rules,
_ => { _ => {
return AclResult { return AclResult {
action: Action::Drop, action: Action::Drop,
+82 -8
View File
@@ -20,7 +20,12 @@ use pnet::packet::{
Packet as _, Packet as _,
}; };
use prost::Message; use prost::Message;
use tokio::{io::copy_bidirectional, select, task::JoinSet}; use tokio::{
io::{copy_bidirectional, AsyncRead, AsyncWrite},
select,
task::JoinSet,
};
use tokio_util::io::InspectReader;
use super::{ use super::{
tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy}, tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy},
@@ -28,11 +33,13 @@ use super::{
}; };
use crate::{ use crate::{
common::{ common::{
acl_processor::PacketInfo,
error::Result, error::Result,
global_ctx::{ArcGlobalCtx, GlobalCtx}, global_ctx::{ArcGlobalCtx, GlobalCtx},
}, },
peers::{peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter}, peers::{acl_filter::AclFilter, peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter},
proto::{ proto::{
acl::{Action, ChainType, Protocol},
cli::{ cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyEntryTransportType, TcpProxyRpc,
@@ -372,6 +379,50 @@ pub struct KcpProxyDst {
tasks: JoinSet<()>, tasks: JoinSet<()>,
} }
#[derive(Clone)]
pub struct ProxyAclHandler {
pub acl_filter: Arc<AclFilter>,
pub packet_info: PacketInfo,
pub chain_type: ChainType,
}
impl ProxyAclHandler {
pub fn handle_packet(&self, buf: &[u8]) -> Result<()> {
let mut packet_info = self.packet_info.clone();
packet_info.packet_size = buf.len();
let ret = self
.acl_filter
.get_processor()
.process_packet(&packet_info, self.chain_type);
self.acl_filter.handle_acl_result(
&ret,
&packet_info,
self.chain_type,
&self.acl_filter.get_processor(),
);
if !matches!(ret.action, Action::Allow) {
return Err(anyhow::anyhow!("acl denied").into());
}
Ok(())
}
pub async fn copy_bidirection_with_acl(
&self,
src: impl AsyncRead + AsyncWrite + Unpin,
mut dst: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<()> {
let (src_reader, src_writer) = tokio::io::split(src);
let src_reader = InspectReader::new(src_reader, |buf| {
let _ = self.handle_packet(buf);
});
let mut src = tokio::io::join(src_reader, src_writer);
copy_bidirectional(&mut src, &mut dst).await?;
Ok(())
}
}
impl KcpProxyDst { impl KcpProxyDst {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self { pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = create_kcp_endpoint(); let mut kcp_endpoint = create_kcp_endpoint();
@@ -396,7 +447,7 @@ impl KcpProxyDst {
#[tracing::instrument(ret)] #[tracing::instrument(ret)]
async fn handle_one_in_stream( async fn handle_one_in_stream(
mut kcp_stream: KcpStream, kcp_stream: KcpStream,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
proxy_entries: Arc<DashMap<ConnId, TcpProxyEntry>>, proxy_entries: Arc<DashMap<ConnId, TcpProxyEntry>>,
cidr_set: Arc<CidrSet>, cidr_set: Arc<CidrSet>,
@@ -411,6 +462,7 @@ impl KcpProxyDst {
parsed_conn_data parsed_conn_data
))? ))?
.into(); .into();
let src_socket: SocketAddr = parsed_conn_data.src.unwrap_or_default().into();
match dst_socket.ip() { match dst_socket.ip() {
IpAddr::V4(dst_v4_ip) => { IpAddr::V4(dst_v4_ip) => {
@@ -437,17 +489,36 @@ impl KcpProxyDst {
proxy_entries.remove(&conn_id); proxy_entries.remove(&conn_id);
} }
if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) let send_to_self =
&& global_ctx.no_tun() Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()));
{
if send_to_self && global_ctx.no_tun() {
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
} }
let acl_handler = ProxyAclHandler {
acl_filter: global_ctx.get_acl_filter().clone(),
packet_info: PacketInfo {
src_ip: src_socket.ip(),
dst_ip: dst_socket.ip(),
src_port: Some(src_socket.port()),
dst_port: Some(dst_socket.port()),
protocol: Protocol::Tcp,
packet_size: conn_data.len(),
},
chain_type: if send_to_self {
ChainType::Inbound
} else {
ChainType::Forward
},
};
acl_handler.handle_packet(&conn_data)?;
tracing::debug!("kcp connect to dst socket: {:?}", dst_socket); tracing::debug!("kcp connect to dst socket: {:?}", dst_socket);
let _g = global_ctx.net_ns.guard(); let _g = global_ctx.net_ns.guard();
let connector = NatDstTcpConnector {}; let connector = NatDstTcpConnector {};
let mut ret = connector let ret = connector
.connect("0.0.0.0:0".parse().unwrap(), dst_socket) .connect("0.0.0.0:0".parse().unwrap(), dst_socket)
.await?; .await?;
@@ -455,7 +526,10 @@ impl KcpProxyDst {
e.state = TcpProxyEntryState::Connected.into(); e.state = TcpProxyEntryState::Connected.into();
} }
copy_bidirectional(&mut ret, &mut kcp_stream).await?; acl_handler
.copy_bidirection_with_acl(kcp_stream, ret)
.await?;
Ok(()) Ok(())
} }
+30 -8
View File
@@ -7,19 +7,21 @@ use dashmap::DashMap;
use pnet::packet::ipv4::Ipv4Packet; use pnet::packet::ipv4::Ipv4Packet;
use prost::Message as _; use prost::Message as _;
use quinn::{Endpoint, Incoming}; use quinn::{Endpoint, Incoming};
use tokio::io::{copy_bidirectional, AsyncRead, AsyncReadExt, AsyncWrite}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time::timeout; use tokio::time::timeout;
use crate::common::acl_processor::PacketInfo;
use crate::common::error::Result; use crate::common::error::Result;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx};
use crate::common::join_joinset_background; use crate::common::join_joinset_background;
use crate::defer; use crate::defer;
use crate::gateway::kcp_proxy::TcpProxyForKcpSrcTrait; use crate::gateway::kcp_proxy::{ProxyAclHandler, TcpProxyForKcpSrcTrait};
use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy}; use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy};
use crate::gateway::CidrSet; use crate::gateway::CidrSet;
use crate::peers::peer_manager::PeerManager; use crate::peers::peer_manager::PeerManager;
use crate::proto::acl::{ChainType, Protocol};
use crate::proto::cli::{ use crate::proto::cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyEntryTransportType, TcpProxyRpc,
@@ -322,12 +324,13 @@ impl QUICProxyDst {
.await; .await;
match ret { match ret {
Ok(Ok((mut quic_stream, mut tcp_stream))) => { Ok(Ok((quic_stream, tcp_stream, acl))) => {
let ret = copy_bidirectional(&mut quic_stream, &mut tcp_stream).await; let remote_addr = quic_stream.connection.as_ref().map(|c| c.remote_address());
let ret = acl.copy_bidirection_with_acl(quic_stream, tcp_stream).await;
tracing::info!( tracing::info!(
"QUIC connection handled, result: {:?}, remote addr: {:?}", "QUIC connection handled, result: {:?}, remote addr: {:?}",
ret, ret,
quic_stream.connection.as_ref().map(|c| c.remote_address()) remote_addr,
); );
} }
Ok(Err(e)) => { Ok(Err(e)) => {
@@ -345,7 +348,7 @@ impl QUICProxyDst {
cidr_set: Arc<CidrSet>, cidr_set: Arc<CidrSet>,
proxy_entry_key: SocketAddr, proxy_entry_key: SocketAddr,
proxy_entries: Arc<DashMap<SocketAddr, TcpProxyEntry>>, proxy_entries: Arc<DashMap<SocketAddr, TcpProxyEntry>>,
) -> Result<(QUICStream, TcpStream)> { ) -> Result<(QUICStream, TcpStream, ProxyAclHandler)> {
let conn = incoming.await.with_context(|| "accept failed")?; let conn = incoming.await.with_context(|| "accept failed")?;
let addr = conn.remote_address(); let addr = conn.remote_address();
tracing::info!("Accepted QUIC connection from {}", addr); tracing::info!("Accepted QUIC connection from {}", addr);
@@ -376,7 +379,8 @@ impl QUICProxyDst {
dst_socket.set_ip(real_ip); dst_socket.set_ip(real_ip);
} }
if Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()) && ctx.no_tun() { let send_to_self = Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address());
if send_to_self && ctx.no_tun() {
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
} }
@@ -391,6 +395,24 @@ impl QUICProxyDst {
}, },
); );
let acl_handler = ProxyAclHandler {
acl_filter: ctx.get_acl_filter().clone(),
packet_info: PacketInfo {
src_ip: addr.ip(),
dst_ip: (*dst_socket.ip()).into(),
src_port: Some(addr.port()),
dst_port: Some(dst_socket.port()),
protocol: Protocol::Tcp,
packet_size: len as usize,
},
chain_type: if send_to_self {
ChainType::Inbound
} else {
ChainType::Forward
},
};
acl_handler.handle_packet(&buf)?;
let connector = NatDstTcpConnector {}; let connector = NatDstTcpConnector {};
let dst_stream = { let dst_stream = {
@@ -411,7 +433,7 @@ impl QUICProxyDst {
receiver: r, receiver: r,
}; };
Ok((quic_stream, dst_stream)) Ok((quic_stream, dst_stream, acl_handler))
} }
} }
+7 -1
View File
@@ -63,7 +63,13 @@ pub struct NatDstTcpConnector;
impl NatDstConnector for NatDstTcpConnector { impl NatDstConnector for NatDstTcpConnector {
type DstStream = TcpStream; type DstStream = TcpStream;
async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> { async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> {
let socket = TcpSocket::new_v4().unwrap(); let socket = match TcpSocket::new_v4() {
Ok(s) => s,
Err(e) => {
eprintln!("create v4 socket failed: {:?}", e);
return Err(e.into());
}
};
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e); tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
} }
+2 -2
View File
@@ -64,7 +64,7 @@ impl AclFilter {
} }
/// Get current processor for processing packets /// Get current processor for processing packets
fn get_processor(&self) -> Arc<AclProcessor> { pub fn get_processor(&self) -> Arc<AclProcessor> {
self.acl_processor.load_full() self.acl_processor.load_full()
} }
@@ -160,7 +160,7 @@ impl AclFilter {
} }
/// Process ACL result and log if needed /// Process ACL result and log if needed
fn handle_acl_result( pub fn handle_acl_result(
&self, &self,
result: &AclResult, result: &AclResult,
packet_info: &PacketInfo, packet_info: &PacketInfo,
+258 -5
View File
@@ -1329,16 +1329,33 @@ async fn avoid_tunnel_loop_back_to_virtual_network() {
drop_insts(insts).await; drop_insts(insts).await;
} }
#[rstest::rstest]
#[tokio::test] #[tokio::test]
#[serial_test::serial] #[serial_test::serial]
pub async fn acl_rule_test_inbound() { pub async fn acl_rule_test_inbound(
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] enable_quic_proxy: bool,
) {
use crate::tunnel::{ use crate::tunnel::{
common::tests::_tunnel_pingpong_netns, common::tests::_tunnel_pingpong_netns,
tcp::{TcpTunnelConnector, TcpTunnelListener}, tcp::{TcpTunnelConnector, TcpTunnelListener},
udp::{UdpTunnelConnector, UdpTunnelListener}, udp::{UdpTunnelConnector, UdpTunnelListener},
}; };
use rand::Rng; use rand::Rng;
let insts = init_three_node("udp").await; let insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.enable_kcp_proxy = enable_kcp_proxy;
flags.enable_quic_proxy = enable_quic_proxy;
cfg.set_flags(flags);
}
cfg
},
false,
)
.await;
// 构造 ACL 配置 // 构造 ACL 配置
use crate::proto::acl::*; use crate::proto::acl::*;
@@ -1422,7 +1439,7 @@ pub async fn acl_rule_test_inbound() {
.await; .await;
// 6. 8080 应该连接失败(被 ACL 拦截) // 6. 8080 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout( let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200), std::time::Duration::from_millis(200),
_tunnel_pingpong_netns( _tunnel_pingpong_netns(
listener_8080, listener_8080,
@@ -1431,10 +1448,13 @@ pub async fn acl_rule_test_inbound() {
NetNS::new(Some("net_a".into())), NetNS::new(Some("net_a".into())),
buf.clone(), buf.clone(),
), ),
) ))
.await; .await;
assert!(result.is_err(), "TCP 连接 8080 应被 ACL 拦截,不能成功"); assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接 8080 应被 ACL 拦截,不能成功"
);
// 7. 从 10.144.144.2 连接 8082 应该连接失败(被 ACL 拦截) // 7. 从 10.144.144.2 连接 8082 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout( let result = tokio::time::timeout(
@@ -1508,3 +1528,236 @@ pub async fn acl_rule_test_inbound() {
drop_insts(insts).await; drop_insts(insts).await;
} }
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn acl_rule_test_subnet_proxy(
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] enable_quic_proxy: bool,
) {
use crate::tunnel::{
common::tests::_tunnel_pingpong_netns,
tcp::{TcpTunnelConnector, TcpTunnelListener},
udp::{UdpTunnelConnector, UdpTunnelListener},
};
use rand::Rng;
let insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.enable_kcp_proxy = enable_kcp_proxy;
flags.enable_quic_proxy = enable_quic_proxy;
cfg.set_flags(flags);
} else if cfg.get_inst_name() == "inst3" {
// 添加子网代理配置
cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
}
cfg
},
false,
)
.await;
// 等待代理路由出现
wait_proxy_route_appear(
&insts[0].get_peer_manager(),
"10.144.144.3/24",
insts[2].peer_id(),
"10.1.2.0/24",
)
.await;
// Test IPv4 connectivity
wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(5),
)
.await;
// 构造 ACL 配置 - 针对子网代理流量
use crate::proto::acl::*;
let mut acl = Acl::default();
let mut acl_v1 = AclV1::default();
let mut chain = Chain::default();
chain.name = "test_subnet_proxy_inbound".to_string();
chain.chain_type = ChainType::Forward as i32;
chain.enabled = true;
// 禁止访问子网代理中的 8080 端口
let mut deny_rule = Rule::default();
deny_rule.name = "deny_subnet_8080".to_string();
deny_rule.priority = 200;
deny_rule.enabled = true;
deny_rule.action = Action::Drop as i32;
deny_rule.protocol = Protocol::Any as i32;
deny_rule.ports = vec!["8080".to_string()];
deny_rule.destination_ips = vec!["10.1.2.0/24".to_string()];
chain.rules.push(deny_rule);
// 禁止来自 inst1 (10.144.144.1) 访问子网代理中的 8081 端口
let mut deny_src_rule = Rule::default();
deny_src_rule.name = "deny_inst1_to_subnet_8081".to_string();
deny_src_rule.priority = 200;
deny_src_rule.enabled = true;
deny_src_rule.action = Action::Drop as i32;
deny_src_rule.protocol = Protocol::Any as i32;
deny_src_rule.ports = vec!["8081".to_string()];
deny_src_rule.source_ips = vec!["10.144.144.1/32".to_string()];
deny_src_rule.destination_ips = vec!["10.1.2.0/24".to_string()];
chain.rules.push(deny_src_rule);
// 允许其他流量
let mut allow_rule = Rule::default();
allow_rule.name = "allow_all".to_string();
allow_rule.priority = 100;
allow_rule.enabled = true;
allow_rule.action = Action::Allow as i32;
allow_rule.protocol = Protocol::Any as i32;
allow_rule.stateful = true;
chain.rules.push(allow_rule);
acl_v1.chains.push(chain);
acl.acl_v1 = Some(acl_v1);
// 在 inst3 上应用 ACL 规则
insts[2]
.get_global_ctx()
.get_acl_filter()
.reload_rules(Some(&acl));
// TCP 测试部分 - 测试子网代理的 ACL 规则
{
// 在 net_d (10.1.2.4) 上监听多个端口
let listener_8080 = TcpTunnelListener::new("tcp://0.0.0.0:8080".parse().unwrap());
let listener_8081 = TcpTunnelListener::new("tcp://0.0.0.0:8081".parse().unwrap());
let listener_8082 = TcpTunnelListener::new("tcp://0.0.0.0:8082".parse().unwrap());
// 从 inst1 (net_a) 连接到子网代理
let connector_8080 = TcpTunnelConnector::new("tcp://10.1.2.4:8080".parse().unwrap());
let connector_8081 = TcpTunnelConnector::new("tcp://10.1.2.4:8081".parse().unwrap());
let connector_8082 = TcpTunnelConnector::new("tcp://10.1.2.4:8082".parse().unwrap());
let mut buf = vec![0; 32];
rand::thread_rng().fill(&mut buf[..]);
// 8082 应该可以连接成功(不被 ACL 拦截)
_tunnel_pingpong_netns(
listener_8082,
connector_8082,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
)
.await;
// 8080 应该连接失败(被 ACL 拦截 - 禁止访问子网代理的 8080)
let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8080,
connector_8080,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
))
.await;
assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接子网代理 8080 应被 ACL 拦截,不能成功"
);
// 8081 应该连接失败(被 ACL 拦截 - 禁止 inst1 访问子网代理的 8081
let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8081,
connector_8081,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
))
.await;
assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接子网代理 8081 应被 ACL 拦截,不能成功"
);
let stats = insts[2].get_global_ctx().get_acl_filter().get_stats();
println!("ACL stats after TCP tests: {:?}", stats);
}
// UDP 测试部分 - 测试子网代理的 ACL 规则
{
let listener_8080 = UdpTunnelListener::new("udp://0.0.0.0:8080".parse().unwrap());
let listener_8082 = UdpTunnelListener::new("udp://0.0.0.0:8082".parse().unwrap());
let connector_8080 = UdpTunnelConnector::new("udp://10.1.2.4:8080".parse().unwrap());
let connector_8082 = UdpTunnelConnector::new("udp://10.1.2.4:8082".parse().unwrap());
let mut buf = vec![0; 32];
rand::thread_rng().fill(&mut buf[..]);
// 8082 应该可以连接成功
_tunnel_pingpong_netns(
listener_8082,
connector_8082,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
)
.await;
// 8080 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8080,
connector_8080,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
)
.await;
let stats = insts[2].get_global_ctx().get_acl_filter().get_stats();
println!("ACL stats after UDP tests: {}", stats);
assert!(
result.is_err(),
"UDP 连接子网代理 8080 应被 ACL 拦截,不能成功"
);
}
// 测试 ICMP 到子网代理(应该被拒绝,因为 Any 协议被拒绝)
tokio::spawn(wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(1),
))
.await
.unwrap_err();
// 移除 ACL 规则
insts[2]
.get_global_ctx()
.get_acl_filter()
.reload_rules(None);
// 验证移除 ACL 后,ICMP 可以正常工作
wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(5),
)
.await;
drop_insts(insts).await;
}