Fix: Fixed compilation issue after partially removing the feature flag (#1835)

This commit is contained in:
Chenx Dust
2026-01-28 21:38:34 +08:00
committed by GitHub
parent 977e502150
commit ccc684a9ab
14 changed files with 512 additions and 239 deletions
+7 -142
View File
@@ -13,19 +13,9 @@ use kcp_sys::{
packet_def::KcpPacket,
stream::KcpStream,
};
use pnet::packet::{
ip::IpNextHeaderProtocols,
ipv4::Ipv4Packet,
tcp::{TcpFlags, TcpPacket},
Packet as _,
};
use pnet::packet::ipv4::Ipv4Packet;
use prost::Message;
use tokio::{
io::{copy_bidirectional, AsyncRead, AsyncWrite},
select,
task::JoinSet,
};
use tokio_util::io::InspectReader;
use tokio::{select, task::JoinSet};
use super::{
tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy},
@@ -37,9 +27,10 @@ use crate::{
error::Result,
global_ctx::{ArcGlobalCtx, GlobalCtx},
},
peers::{acl_filter::AclFilter, peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter},
gateway::wrapped_proxy::{ProxyAclHandler, TcpProxyForWrappedSrcTrait},
peers::{peer_manager::PeerManager, PeerPacketFilter},
proto::{
acl::{Action, ChainType, Protocol},
acl::{ChainType, Protocol},
api::instance::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc,
@@ -215,21 +206,14 @@ impl NatDstConnector for NatDstKcpConnector {
struct TcpProxyForKcpSrc(Arc<TcpProxy<NatDstKcpConnector>>);
#[async_trait::async_trait]
pub(crate) trait TcpProxyForKcpSrcTrait: Send + Sync + 'static {
type Connector: NatDstConnector;
fn get_tcp_proxy(&self) -> &Arc<TcpProxy<Self::Connector>>;
async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool;
}
#[async_trait::async_trait]
impl TcpProxyForKcpSrcTrait for TcpProxyForKcpSrc {
impl TcpProxyForWrappedSrcTrait for TcpProxyForKcpSrc {
type Connector = NatDstKcpConnector;
fn get_tcp_proxy(&self) -> &Arc<TcpProxy<Self::Connector>> {
&self.0
}
async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool {
async fn check_dst_allow_wrapped_input(&self, dst_ip: &Ipv4Addr) -> bool {
let Some(peer_manager) = self.0.get_peer_manager() else {
return false;
};
@@ -239,81 +223,6 @@ impl TcpProxyForKcpSrcTrait for TcpProxyForKcpSrc {
}
}
#[async_trait::async_trait]
impl<C: NatDstConnector, T: TcpProxyForKcpSrcTrait<Connector = C>> NicPacketFilter for T {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool {
let ret = self
.get_tcp_proxy()
.try_process_packet_from_nic(zc_packet)
.await;
if ret {
return true;
}
let data = zc_packet.payload();
let ip_packet = Ipv4Packet::new(data).unwrap();
if ip_packet.get_version() != 4
|| ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp
{
return false;
}
// if no connection is established, only allow SYN packet
let tcp_packet = TcpPacket::new(ip_packet.payload()).unwrap();
let is_syn = tcp_packet.get_flags() & TcpFlags::SYN != 0
&& tcp_packet.get_flags() & TcpFlags::ACK == 0;
if is_syn {
// only check dst feature flag when SYN packet
if !self
.check_dst_allow_kcp_input(&ip_packet.get_destination())
.await
{
tracing::warn!(
"{:?} proxy src: dst {} not allow kcp input",
self.get_tcp_proxy().get_transport_type(),
ip_packet.get_destination()
);
return false;
}
} else {
// if not syn packet, only allow established connection
if !self
.get_tcp_proxy()
.is_tcp_proxy_connection(SocketAddr::new(
IpAddr::V4(ip_packet.get_source()),
tcp_packet.get_source(),
))
{
return false;
}
}
if let Some(my_ipv4) = self.get_tcp_proxy().get_global_ctx().get_ipv4() {
// this is a net-to-net packet, only allow it when smoltcp is enabled
// because the syn-ack packet will not be through and handled by the tun device when
// the source ip is in the local network
if ip_packet.get_source() != my_ipv4.address()
&& !self.get_tcp_proxy().is_smoltcp_enabled()
{
tracing::warn!(
"{:?} nat 2 nat packet, src: {} dst: {} not allow kcp input",
self.get_tcp_proxy().get_transport_type(),
ip_packet.get_source(),
ip_packet.get_destination()
);
return false;
}
};
let hdr = zc_packet.mut_peer_manager_header().unwrap();
hdr.to_peer_id = self.get_tcp_proxy().get_my_peer_id().into();
if self.get_tcp_proxy().get_transport_type() == TcpProxyEntryTransportType::Kcp {
hdr.set_kcp_src_modified(true);
}
true
}
}
pub struct KcpProxySrc {
kcp_endpoint: Arc<KcpEndpoint>,
peer_manager: Arc<PeerManager>,
@@ -387,50 +296,6 @@ pub struct KcpProxyDst {
tasks: JoinSet<()>,
}
#[derive(Clone)]
pub struct ProxyAclHandler {
pub acl_filter: Arc<AclFilter>,
pub packet_info: PacketInfo,
pub chain_type: ChainType,
}
impl ProxyAclHandler {
pub fn handle_packet(&self, buf: &[u8]) -> Result<()> {
let mut packet_info = self.packet_info.clone();
packet_info.packet_size = buf.len();
let ret = self
.acl_filter
.get_processor()
.process_packet(&packet_info, self.chain_type);
self.acl_filter.handle_acl_result(
&ret,
&packet_info,
self.chain_type,
&self.acl_filter.get_processor(),
);
if !matches!(ret.action, Action::Allow) {
return Err(anyhow::anyhow!("acl denied").into());
}
Ok(())
}
pub async fn copy_bidirection_with_acl(
&self,
src: impl AsyncRead + AsyncWrite + Unpin,
mut dst: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<()> {
let (src_reader, src_writer) = tokio::io::split(src);
let src_reader = InspectReader::new(src_reader, |buf| {
let _ = self.handle_packet(buf);
});
let mut src = tokio::io::join(src_reader, src_writer);
copy_bidirectional(&mut src, &mut dst).await?;
Ok(())
}
}
impl KcpProxyDst {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = create_kcp_endpoint();
+3
View File
@@ -16,8 +16,11 @@ pub mod fast_socks5;
#[cfg(feature = "socks5")]
pub mod socks5;
#[cfg(feature = "kcp")]
pub mod kcp_proxy;
mod wrapped_proxy;
#[cfg(feature = "quic")]
pub mod quic_proxy;
#[derive(Debug)]
+3 -3
View File
@@ -17,8 +17,8 @@ use crate::common::error::Result;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx};
use crate::common::join_joinset_background;
use crate::defer;
use crate::gateway::kcp_proxy::{ProxyAclHandler, TcpProxyForKcpSrcTrait};
use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy};
use crate::gateway::wrapped_proxy::{ProxyAclHandler, TcpProxyForWrappedSrcTrait};
use crate::gateway::CidrSet;
use crate::peers::peer_manager::PeerManager;
use crate::proto::acl::{ChainType, Protocol};
@@ -184,14 +184,14 @@ impl NatDstConnector for NatDstQUICConnector {
struct TcpProxyForQUICSrc(Arc<TcpProxy<NatDstQUICConnector>>);
#[async_trait::async_trait]
impl TcpProxyForKcpSrcTrait for TcpProxyForQUICSrc {
impl TcpProxyForWrappedSrcTrait for TcpProxyForQUICSrc {
type Connector = NatDstQUICConnector;
fn get_tcp_proxy(&self) -> &Arc<TcpProxy<Self::Connector>> {
&self.0
}
async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool {
async fn check_dst_allow_wrapped_input(&self, dst_ip: &Ipv4Addr) -> bool {
let Some(peer_manager) = self.0.get_peer_manager() else {
return false;
};
+29 -3
View File
@@ -9,9 +9,12 @@ use std::{
};
use crossbeam::atomic::AtomicCell;
#[cfg(feature = "kcp")]
use kcp_sys::{endpoint::KcpEndpoint, stream::KcpStream};
use tokio_util::sync::{CancellationToken, DropGuard};
#[cfg(feature = "kcp")]
use crate::gateway::kcp_proxy::NatDstKcpConnector;
use crate::{
common::{
config::PortForwardConfig, global_ctx::GlobalCtxEvent, join_joinset_background,
@@ -25,7 +28,6 @@ use crate::{
util::stream::tcp_connect_with_timeout,
},
ip_reassembler::IpReassembler,
kcp_proxy::NatDstKcpConnector,
tokio_smoltcp::{channel_device, BufferSize, Net, NetConfig},
},
tunnel::{
@@ -52,6 +54,7 @@ use crate::{
peers::{peer_manager::PeerManager, PeerPacketFilter},
};
#[cfg(feature = "kcp")]
use super::tcp_proxy::NatDstConnector as _;
enum SocksUdpSocket {
@@ -78,6 +81,7 @@ impl SocksUdpSocket {
enum SocksTcpStream {
Tcp(tokio::net::TcpStream),
SmolTcp(super::tokio_smoltcp::TcpStream),
#[cfg(feature = "kcp")]
Kcp(KcpStream),
}
@@ -92,6 +96,7 @@ impl AsyncRead for SocksTcpStream {
SocksTcpStream::SmolTcp(ref mut stream) => {
std::pin::Pin::new(stream).poll_read(cx, buf)
}
#[cfg(feature = "kcp")]
SocksTcpStream::Kcp(ref mut stream) => std::pin::Pin::new(stream).poll_read(cx, buf),
}
}
@@ -108,6 +113,7 @@ impl AsyncWrite for SocksTcpStream {
SocksTcpStream::SmolTcp(ref mut stream) => {
std::pin::Pin::new(stream).poll_write(cx, buf)
}
#[cfg(feature = "kcp")]
SocksTcpStream::Kcp(ref mut stream) => std::pin::Pin::new(stream).poll_write(cx, buf),
}
}
@@ -119,6 +125,7 @@ impl AsyncWrite for SocksTcpStream {
match self.get_mut() {
SocksTcpStream::Tcp(ref mut stream) => std::pin::Pin::new(stream).poll_flush(cx),
SocksTcpStream::SmolTcp(ref mut stream) => std::pin::Pin::new(stream).poll_flush(cx),
#[cfg(feature = "kcp")]
SocksTcpStream::Kcp(ref mut stream) => std::pin::Pin::new(stream).poll_flush(cx),
}
}
@@ -130,6 +137,7 @@ impl AsyncWrite for SocksTcpStream {
match self.get_mut() {
SocksTcpStream::Tcp(ref mut stream) => std::pin::Pin::new(stream).poll_shutdown(cx),
SocksTcpStream::SmolTcp(ref mut stream) => std::pin::Pin::new(stream).poll_shutdown(cx),
#[cfg(feature = "kcp")]
SocksTcpStream::Kcp(ref mut stream) => std::pin::Pin::new(stream).poll_shutdown(cx),
}
}
@@ -211,12 +219,14 @@ impl Drop for SmolTcpConnector {
}
}
#[cfg(feature = "kcp")]
struct Socks5KcpConnector {
kcp_endpoint: Weak<KcpEndpoint>,
peer_mgr: Weak<PeerManager>,
src_addr: SocketAddr,
}
#[cfg(feature = "kcp")]
#[async_trait::async_trait]
impl AsyncTcpConnector for Socks5KcpConnector {
type S = SocksTcpStream;
@@ -242,6 +252,7 @@ impl AsyncTcpConnector for Socks5KcpConnector {
}
struct Socks5AutoConnector {
#[cfg(feature = "kcp")]
kcp_endpoint: Option<Weak<KcpEndpoint>>,
peer_mgr: Weak<PeerManager>,
entries: Socks5EntrySet,
@@ -288,6 +299,7 @@ impl AsyncTcpConnector for Socks5AutoConnector {
let dst_allow_kcp = peer_mgr_arc.check_allow_kcp_to_dst(&addr.ip()).await;
tracing::debug!("dst_allow_kcp: {:?}", dst_allow_kcp);
#[cfg(feature = "kcp")]
let connector: Box<dyn AsyncTcpConnector<S = SocksTcpStream> + Send> =
match (&self.kcp_endpoint, dst_allow_kcp) {
(Some(kcp_endpoint), true) => Box::new(Socks5KcpConnector {
@@ -301,6 +313,12 @@ impl AsyncTcpConnector for Socks5AutoConnector {
current_entry: std::sync::Mutex::new(None),
}),
};
#[cfg(not(feature = "kcp"))]
let connector = Box::new(SmolTcpConnector {
net: self.smoltcp_net.clone().unwrap(),
entries: self.entries.clone(),
current_entry: std::sync::Mutex::new(None),
});
let ret = connector.tcp_connect(addr, timeout_s).await;
self.inner_connector.lock().replace(Box::new(connector));
@@ -490,6 +508,7 @@ pub struct Socks5Server {
udp_client_map: Arc<DashMap<UdpClientKey, Arc<UdpClientInfo>>>,
udp_forward_task: Arc<DashMap<UdpClientKey, ScopedTask<()>>>,
#[cfg(feature = "kcp")]
kcp_endpoint: Mutex<Option<Weak<KcpEndpoint>>>,
socks5_enabled: Arc<AtomicBool>,
@@ -603,6 +622,7 @@ impl Socks5Server {
udp_client_map: Arc::new(DashMap::new()),
udp_forward_task: Arc::new(DashMap::new()),
#[cfg(feature = "kcp")]
kcp_endpoint: Mutex::new(None),
socks5_enabled: Arc::new(AtomicBool::new(false)),
@@ -662,9 +682,12 @@ impl Socks5Server {
pub async fn run(
self: &Arc<Self>,
kcp_endpoint: Option<Weak<KcpEndpoint>>,
#[cfg(feature = "kcp")] kcp_endpoint: Option<Weak<KcpEndpoint>>,
) -> Result<(), Error> {
*self.kcp_endpoint.lock().await = kcp_endpoint.clone();
#[cfg(feature = "kcp")]
{
*self.kcp_endpoint.lock().await = kcp_endpoint.clone();
}
if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() {
let bind_addr = format!(
"{}:{}",
@@ -692,6 +715,7 @@ impl Socks5Server {
.as_ref()
.map(|net| net.smoltcp_net.clone()),
entries: entries.clone(),
#[cfg(feature = "kcp")]
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: peer_manager.clone(),
src_addr: addr,
@@ -811,6 +835,7 @@ impl Socks5Server {
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
join_joinset_background(tasks.clone(), "tcp port forward".to_string());
let forward_tasks = tasks;
#[cfg(feature = "kcp")]
let kcp_endpoint = self.kcp_endpoint.lock().await.clone();
let peer_mgr = self.peer_manager.clone();
let cancel_token = CancellationToken::new();
@@ -843,6 +868,7 @@ impl Socks5Server {
);
let connector = Socks5AutoConnector {
#[cfg(feature = "kcp")]
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: peer_mgr.clone(),
entries: entries.clone(),
+4
View File
@@ -201,6 +201,7 @@ impl ProxyTcpStream {
}
}
#[cfg(feature = "smoltcp")]
type SmolTcpAcceptResult = Result<(tokio_smoltcp::TcpStream, SocketAddr)>;
#[cfg(feature = "smoltcp")]
struct SmolTcpListener {
@@ -331,6 +332,7 @@ pub struct TcpProxy<C: NatDstConnector> {
smoltcp_stack_receiver: Arc<Mutex<Option<mpsc::Receiver<ZCPacket>>>>,
#[cfg(feature = "smoltcp")]
smoltcp_net: Arc<Mutex<Option<Net>>>,
#[cfg(feature = "smoltcp")]
smoltcp_listener_tx: std::sync::Mutex<Option<mpsc::UnboundedSender<SmolTcpAcceptResult>>>,
enable_smoltcp: Arc<AtomicBool>,
@@ -461,6 +463,7 @@ impl<C: NatDstConnector> TcpProxy<C> {
#[cfg(feature = "smoltcp")]
smoltcp_net: Arc::new(Mutex::new(None)),
#[cfg(feature = "smoltcp")]
smoltcp_listener_tx: std::sync::Mutex::new(None),
enable_smoltcp: Arc::new(AtomicBool::new(true)),
@@ -930,6 +933,7 @@ impl<C: NatDstConnector> TcpProxy<C> {
tracing::info!(src = ?src, ?real_dst, ?mapped_dst, old_entry = ?old_val, "tcp syn received");
// if smoltcp is enabled, add the listener to the net
#[cfg(feature = "smoltcp")]
if self.is_smoltcp_enabled() {
let smoltcp_listener_tx = self.smoltcp_listener_tx.lock().unwrap().clone().unwrap();
SmolTcpListener::add_listener(
+150
View File
@@ -0,0 +1,150 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};
use pnet::packet::{
ip::IpNextHeaderProtocols,
ipv4::Ipv4Packet,
tcp::{TcpFlags, TcpPacket},
Packet as _,
};
use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite};
use tokio_util::io::InspectReader;
use crate::{
common::{acl_processor::PacketInfo, error::Result},
gateway::tcp_proxy::{NatDstConnector, TcpProxy},
peers::{acl_filter::AclFilter, NicPacketFilter},
proto::{
acl::{Action, ChainType},
api::instance::TcpProxyEntryTransportType,
},
tunnel::packet_def::ZCPacket,
};
#[derive(Clone)]
pub struct ProxyAclHandler {
pub acl_filter: Arc<AclFilter>,
pub packet_info: PacketInfo,
pub chain_type: ChainType,
}
impl ProxyAclHandler {
pub fn handle_packet(&self, buf: &[u8]) -> Result<()> {
let mut packet_info = self.packet_info.clone();
packet_info.packet_size = buf.len();
let ret = self
.acl_filter
.get_processor()
.process_packet(&packet_info, self.chain_type);
self.acl_filter.handle_acl_result(
&ret,
&packet_info,
self.chain_type,
&self.acl_filter.get_processor(),
);
if !matches!(ret.action, Action::Allow) {
return Err(anyhow::anyhow!("acl denied").into());
}
Ok(())
}
pub async fn copy_bidirection_with_acl(
&self,
src: impl AsyncRead + AsyncWrite + Unpin,
mut dst: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<()> {
let (src_reader, src_writer) = tokio::io::split(src);
let src_reader = InspectReader::new(src_reader, |buf| {
let _ = self.handle_packet(buf);
});
let mut src = tokio::io::join(src_reader, src_writer);
copy_bidirectional(&mut src, &mut dst).await?;
Ok(())
}
}
#[async_trait::async_trait]
pub(crate) trait TcpProxyForWrappedSrcTrait: Send + Sync + 'static {
type Connector: NatDstConnector;
fn get_tcp_proxy(&self) -> &Arc<TcpProxy<Self::Connector>>;
async fn check_dst_allow_wrapped_input(&self, dst_ip: &Ipv4Addr) -> bool;
}
#[async_trait::async_trait]
impl<C: NatDstConnector, T: TcpProxyForWrappedSrcTrait<Connector = C>> NicPacketFilter for T {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool {
let ret = self
.get_tcp_proxy()
.try_process_packet_from_nic(zc_packet)
.await;
if ret {
return true;
}
let data = zc_packet.payload();
let ip_packet = Ipv4Packet::new(data).unwrap();
if ip_packet.get_version() != 4
|| ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp
{
return false;
}
// if no connection is established, only allow SYN packet
let tcp_packet = TcpPacket::new(ip_packet.payload()).unwrap();
let is_syn = tcp_packet.get_flags() & TcpFlags::SYN != 0
&& tcp_packet.get_flags() & TcpFlags::ACK == 0;
if is_syn {
// only check dst feature flag when SYN packet
if !self
.check_dst_allow_wrapped_input(&ip_packet.get_destination())
.await
{
tracing::warn!(
"{:?} proxy src: dst {} not allow wrapped input",
self.get_tcp_proxy().get_transport_type(),
ip_packet.get_destination()
);
return false;
}
} else {
// if not syn packet, only allow established connection
if !self
.get_tcp_proxy()
.is_tcp_proxy_connection(SocketAddr::new(
IpAddr::V4(ip_packet.get_source()),
tcp_packet.get_source(),
))
{
return false;
}
}
if let Some(my_ipv4) = self.get_tcp_proxy().get_global_ctx().get_ipv4() {
// this is a net-to-net packet, only allow it when smoltcp is enabled
// because the syn-ack packet will not be through and handled by the tun device when
// the source ip is in the local network
if ip_packet.get_source() != my_ipv4.address()
&& !self.get_tcp_proxy().is_smoltcp_enabled()
{
tracing::warn!(
"{:?} nat 2 nat packet, src: {} dst: {} not allow wrapped input",
self.get_tcp_proxy().get_transport_type(),
ip_packet.get_source(),
ip_packet.get_destination()
);
return false;
}
};
let hdr = zc_packet.mut_peer_manager_header().unwrap();
hdr.to_peer_id = self.get_tcp_proxy().get_my_peer_id().into();
if self.get_tcp_proxy().get_transport_type() == TcpProxyEntryTransportType::Kcp {
hdr.set_kcp_src_modified(true);
}
true
}
}