Compare commits

...

4 Commits

Author SHA1 Message Date
fanyang 9d7a938e93 Address review comments 2026-05-04 10:42:51 +08:00
fanyang 6229229b31 feat: support lzo compression 2026-05-04 10:42:51 +08:00
fanyang 6a63853bad fix: silence listener warning in feature builds 2026-05-04 10:42:51 +08:00
fanyang 362aa7a9cd fix: allow omitted ACL config fields (#2206) 2026-05-04 00:47:24 +08:00
12 changed files with 233 additions and 25 deletions
Generated
+11
View File
@@ -2288,6 +2288,7 @@ dependencies = [
"indoc", "indoc",
"itertools 0.14.0", "itertools 0.14.0",
"kcp-sys", "kcp-sys",
"lzokay-native",
"machine-uid", "machine-uid",
"maplit", "maplit",
"mimalloc", "mimalloc",
@@ -4874,6 +4875,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "lzokay-native"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "792ba667add2798c6c3e988e630f4eb921b5cbc735044825b7111ef1582c8730"
dependencies = [
"byteorder",
"thiserror 1.0.63",
]
[[package]] [[package]]
name = "mac" name = "mac"
version = "0.1.1" version = "0.1.1"
+4
View File
@@ -221,6 +221,7 @@ async-ringbuf = "0.3.1"
service-manager = { git = "https://github.com/EasyTier/service-manager-rs.git", branch = "main" } service-manager = { git = "https://github.com/EasyTier/service-manager-rs.git", branch = "main" }
zstd = { version = "0.13", optional = true } zstd = { version = "0.13", optional = true }
lzokay-native = { version = "0.1", optional = true }
kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "94964794caaed5d388463137da59b97499619e5f", optional = true } kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "94964794caaed5d388463137da59b97499619e5f", optional = true }
@@ -358,6 +359,7 @@ default = [
"faketcp", "faketcp",
"magic-dns", "magic-dns",
"zstd", "zstd",
"lzo",
] ]
full = [ full = [
"websocket", "websocket",
@@ -372,6 +374,7 @@ full = [
"faketcp", "faketcp",
"magic-dns", "magic-dns",
"zstd", "zstd",
"lzo",
] ]
wireguard = ["dep:boringtun", "dep:ring"] wireguard = ["dep:boringtun", "dep:ring"]
quic = ["dep:quinn", "dep:quinn-plaintext", "dep:rustls", "dep:rcgen"] quic = ["dep:quinn", "dep:quinn-plaintext", "dep:rustls", "dep:rcgen"]
@@ -402,5 +405,6 @@ tracing = ["tokio/tracing", "dep:console-subscriber"]
magic-dns = ["dep:hickory-client", "dep:hickory-server"] magic-dns = ["dep:hickory-client", "dep:hickory-server"]
faketcp = ["dep:flume"] faketcp = ["dep:flume"]
zstd = ["dep:zstd"] zstd = ["dep:zstd"]
lzo = ["dep:lzokay-native"]
# For Network Extension on macOS # For Network Extension on macOS
macos-ne = [] macos-ne = []
+5
View File
@@ -191,6 +191,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
) )
.type_attribute("peer_rpc.RouteForeignNetworkSummary", "#[derive(Hash, Eq)]") .type_attribute("peer_rpc.RouteForeignNetworkSummary", "#[derive(Hash, Eq)]")
.type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]") .type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]")
.type_attribute("acl.Acl", "#[serde(default)]")
.type_attribute("acl.AclV1", "#[serde(default)]")
.type_attribute("acl.Chain", "#[serde(default)]")
.type_attribute("acl.Rule", "#[serde(default)]")
.type_attribute("acl.GroupInfo", "#[serde(default)]")
.field_attribute(".api.manage.NetworkConfig", "#[serde(default)]") .field_attribute(".api.manage.NetworkConfig", "#[serde(default)]")
.service_generator(Box::new(easytier_rpc_build::ServiceGenerator::default())) .service_generator(Box::new(easytier_rpc_build::ServiceGenerator::default()))
.btree_map(["."]) .btree_map(["."])
+2 -2
View File
@@ -194,8 +194,8 @@ core_clap:
en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port" en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port"
zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听" zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听"
compression: compression:
en: "compression algorithm to use, support none, zstd. default is none" en: "compression algorithm to use, supported: %{algorithms}. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none" zh-CN: "要使用的压缩算法,支持%{algorithms}。默认为 none"
mapped_listeners: mapped_listeners:
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple." en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。" zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。"
+39
View File
@@ -1339,6 +1339,45 @@ mod tests {
assert_eq!(result.matched_rule, Some(RuleId::Priority(70))); assert_eq!(result.matched_rule, Some(RuleId::Priority(70)));
} }
#[tokio::test]
async fn test_forward_acl_source_ip_whitelist() {
let mut acl_config = Acl::default();
let mut acl_v1 = AclV1::default();
let mut chain = Chain {
name: "subnet_proxy_protect".to_string(),
chain_type: ChainType::Forward as i32,
enabled: true,
default_action: Action::Drop as i32,
..Default::default()
};
chain.rules.push(Rule {
name: "allow_my_devices".to_string(),
priority: 1000,
enabled: true,
action: Action::Allow as i32,
protocol: Protocol::Any as i32,
source_ips: vec!["10.172.192.2/32".to_string()],
..Default::default()
});
acl_v1.chains.push(chain);
acl_config.acl_v1 = Some(acl_v1);
let processor = AclProcessor::new(acl_config);
let mut packet_info = create_test_packet_info();
packet_info.dst_ip = "192.168.1.10".parse().unwrap();
packet_info.src_ip = "10.172.192.2".parse().unwrap();
let result = processor.process_packet(&packet_info, ChainType::Forward);
assert_eq!(result.action, Action::Allow);
assert_eq!(result.matched_rule, Some(RuleId::Priority(1000)));
packet_info.src_ip = "10.172.192.3".parse().unwrap();
let result = processor.process_packet(&packet_info, ChainType::Forward);
assert_eq!(result.action, Action::Drop);
assert_eq!(result.matched_rule, Some(RuleId::Default));
}
fn create_test_acl_config() -> Acl { fn create_test_acl_config() -> Acl {
let mut acl_config = Acl::default(); let mut acl_config = Acl::default();
+46 -10
View File
@@ -1,4 +1,4 @@
#[cfg(feature = "zstd")] #[cfg(any(feature = "zstd", feature = "lzo"))]
use anyhow::Context; use anyhow::Context;
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
use dashmap::DashMap; use dashmap::DashMap;
@@ -53,6 +53,13 @@ impl DefaultCompressor {
) )
}) })
}), }),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => lzokay_native::compress(data).with_context(|| {
format!(
"Failed to compress data with algorithm: {:?}",
compress_algo
)
}),
CompressorAlgo::None => Ok(data.to_vec()), CompressorAlgo::None => Ok(data.to_vec()),
} }
} }
@@ -85,6 +92,13 @@ impl DefaultCompressor {
compress_algo compress_algo
)) ))
}), }),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => lzokay_native::decompress_all(data, None).with_context(|| {
format!(
"Failed to decompress data with algorithm: {:?}",
compress_algo
)
}),
CompressorAlgo::None => Ok(data.to_vec()), CompressorAlgo::None => Ok(data.to_vec()),
} }
} }
@@ -181,14 +195,13 @@ thread_local! {
static DCTX_MAP: RefCell<DashMap<CompressorAlgo, bulk::Decompressor<'static>>> = RefCell::new(DashMap::new()); static DCTX_MAP: RefCell<DashMap<CompressorAlgo, bulk::Decompressor<'static>>> = RefCell::new(DashMap::new());
} }
#[cfg(all(test, feature = "zstd"))] #[cfg(all(test, any(feature = "zstd", feature = "lzo")))]
pub mod tests { pub mod tests {
use super::*; use super::*;
#[tokio::test] async fn test_compress_algo(compress_algo: CompressorAlgo) {
async fn test_compress() { let text = vec![b'a'; 4096];
let text = b"12345670000000000000000000"; let mut packet = ZCPacket::new_with_payload(&text);
let mut packet = ZCPacket::new_with_payload(text);
packet.fill_peer_manager_hdr(0, 0, 0); packet.fill_peer_manager_hdr(0, 0, 0);
let compressor = DefaultCompressor {}; let compressor = DefaultCompressor {};
@@ -200,7 +213,7 @@ pub mod tests {
); );
compressor compressor
.compress(&mut packet, CompressorAlgo::ZstdDefault) .compress(&mut packet, compress_algo)
.await .await
.unwrap(); .unwrap();
println!( println!(
@@ -215,8 +228,7 @@ pub mod tests {
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
} }
#[tokio::test] async fn test_short_text_compress_algo(compress_algo: CompressorAlgo) {
async fn test_short_text_compress() {
let text = b"1234"; let text = b"1234";
let mut packet = ZCPacket::new_with_payload(text); let mut packet = ZCPacket::new_with_payload(text);
packet.fill_peer_manager_hdr(0, 0, 0); packet.fill_peer_manager_hdr(0, 0, 0);
@@ -225,7 +237,7 @@ pub mod tests {
// short text can't be compressed // short text can't be compressed
compressor compressor
.compress(&mut packet, CompressorAlgo::ZstdDefault) .compress(&mut packet, compress_algo)
.await .await
.unwrap(); .unwrap();
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
@@ -234,4 +246,28 @@ pub mod tests {
assert_eq!(packet.payload(), text); assert_eq!(packet.payload(), text);
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
} }
#[cfg(feature = "zstd")]
#[tokio::test]
async fn test_zstd_compress() {
test_compress_algo(CompressorAlgo::ZstdDefault).await;
}
#[cfg(feature = "zstd")]
#[tokio::test]
async fn test_zstd_short_text_compress() {
test_short_text_compress_algo(CompressorAlgo::ZstdDefault).await;
}
#[cfg(feature = "lzo")]
#[tokio::test]
async fn test_lzo_compress() {
test_compress_algo(CompressorAlgo::Lzo).await;
}
#[cfg(feature = "lzo")]
#[tokio::test]
async fn test_lzo_short_text_compress() {
test_short_text_compress_algo(CompressorAlgo::Lzo).await;
}
} }
+65
View File
@@ -1337,6 +1337,71 @@ stun_servers = [
assert!(err.to_string().contains("mapped listener port is missing")); assert!(err.to_string().contains("mapped listener port is missing"));
} }
#[test]
fn test_acl_toml_rule_uses_defaults_for_omitted_fields() {
use crate::proto::acl::{Action, ChainType, Protocol};
let config_str = r#"
[[acl.acl_v1.chains]]
name = "subnet_proxy_protect"
chain_type = 3
enabled = true
default_action = 2
[[acl.acl_v1.chains.rules]]
name = "allow_my_devices"
priority = 1000
action = 1
source_ips = ["10.172.192.2/32"]
protocol = 5
enabled = true
"#;
let config = TomlConfigLoader::new_from_str(config_str).unwrap();
let acl = config.get_acl().unwrap();
let acl_v1 = acl.acl_v1.unwrap();
let chain = &acl_v1.chains[0];
let rule = &chain.rules[0];
assert_eq!(chain.chain_type, ChainType::Forward as i32);
assert_eq!(chain.default_action, Action::Drop as i32);
assert_eq!(rule.action, Action::Allow as i32);
assert_eq!(rule.protocol, Protocol::Any as i32);
assert_eq!(rule.source_ips, vec!["10.172.192.2/32"]);
assert!(rule.ports.is_empty());
assert!(rule.source_ports.is_empty());
assert!(rule.destination_ips.is_empty());
assert!(rule.source_groups.is_empty());
assert!(rule.destination_groups.is_empty());
assert_eq!(rule.rate_limit, 0);
assert_eq!(rule.burst_limit, 0);
assert!(!rule.stateful);
}
#[test]
fn test_acl_toml_group_can_omit_declares_or_members() {
let declares_only = r#"
[acl.acl_v1.group]
[[acl.acl_v1.group.declares]]
group_name = "admin"
group_secret = "admin-pw"
"#;
let config = TomlConfigLoader::new_from_str(declares_only).unwrap();
let group = config.get_acl().unwrap().acl_v1.unwrap().group.unwrap();
assert_eq!(group.declares.len(), 1);
assert!(group.members.is_empty());
let members_only = r#"
[acl.acl_v1.group]
members = ["admin"]
"#;
let config = TomlConfigLoader::new_from_str(members_only).unwrap();
let group = config.get_acl().unwrap().acl_v1.unwrap().group.unwrap();
assert!(group.declares.is_empty());
assert_eq!(group.members, vec!["admin"]);
}
#[test] #[test]
fn test_network_config_source_user_is_implicit() { fn test_network_config_source_user_is_implicit() {
let config = TomlConfigLoader::default(); let config = TomlConfigLoader::default();
+49 -10
View File
@@ -37,6 +37,38 @@ use crate::tunnel::IpScheme;
#[cfg(feature = "jemalloc-prof")] #[cfg(feature = "jemalloc-prof")]
use jemalloc_ctl::{Access as _, AsName as _, epoch, stats}; use jemalloc_ctl::{Access as _, AsName as _, epoch, stats};
fn supported_compression_algorithms() -> &'static str {
cfg_select! {
all(feature = "zstd", feature = "lzo") => "none, zstd, lzo",
feature = "zstd" => "none, zstd",
feature = "lzo" => "none, lzo",
_ => "none",
}
}
fn compression_help() -> String {
t!(
"core_clap.compression",
algorithms = supported_compression_algorithms()
)
.to_string()
}
fn parse_compression_algorithm(compression: &str) -> anyhow::Result<CompressionAlgoPb> {
match compression {
"none" => Ok(CompressionAlgoPb::None),
#[cfg(feature = "zstd")]
"zstd" => Ok(CompressionAlgoPb::Zstd),
#[cfg(feature = "lzo")]
"lzo" => Ok(CompressionAlgoPb::Lzo),
_ => anyhow::bail!(
"unknown compression algorithm: {}, supported: {}",
compression,
supported_compression_algorithms()
),
}
}
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
windows_service::define_windows_service!(ffi_service_main, win_service_main); windows_service::define_windows_service!(ffi_service_main, win_service_main);
@@ -513,7 +545,7 @@ struct NetworkOptions {
#[arg( #[arg(
long, long,
env = "ET_COMPRESSION", env = "ET_COMPRESSION",
help = t!("core_clap.compression").to_string(), help = compression_help(),
)] )]
compression: Option<String>, compression: Option<String>,
@@ -1106,15 +1138,7 @@ impl NetworkOptions {
f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p); f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p);
f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread); f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread);
if let Some(compression) = &self.compression { if let Some(compression) = &self.compression {
f.data_compress_algo = match compression.as_str() { f.data_compress_algo = parse_compression_algorithm(compression)?.into();
"none" => CompressionAlgoPb::None,
"zstd" => CompressionAlgoPb::Zstd,
_ => panic!(
"unknown compression algorithm: {}, supported: none, zstd",
compression
),
}
.into();
} }
f.bind_device = self.bind_device.unwrap_or(f.bind_device); f.bind_device = self.bind_device.unwrap_or(f.bind_device);
f.enable_kcp_proxy = self.enable_kcp_proxy.unwrap_or(f.enable_kcp_proxy); f.enable_kcp_proxy = self.enable_kcp_proxy.unwrap_or(f.enable_kcp_proxy);
@@ -1627,6 +1651,21 @@ async fn validate_config(cli: &Cli) -> anyhow::Result<()> {
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_compression_help_uses_supported_algorithms() {
assert!(compression_help().contains(supported_compression_algorithms()));
}
#[test]
fn test_parse_compression_algorithm_rejects_unknown() {
let err = parse_compression_algorithm("snappy")
.unwrap_err()
.to_string();
assert!(err.contains("snappy"));
assert!(err.contains(supported_compression_algorithms()));
}
#[test] #[test]
fn test_parse_listeners() { fn test_parse_listeners() {
type IpSchemeMap = fn(&IpScheme) -> String; type IpSchemeMap = fn(&IpScheme) -> String;
+3 -3
View File
@@ -25,7 +25,7 @@ use crate::{
pub fn create_listener_by_url( pub fn create_listener_by_url(
l: &url::Url, l: &url::Url,
global_ctx: ArcGlobalCtx, _global_ctx: ArcGlobalCtx,
) -> Result<Box<dyn TunnelListener>, Error> { ) -> Result<Box<dyn TunnelListener>, Error> {
Ok(match l.try_into()? { Ok(match l.try_into()? {
TunnelScheme::Ip(scheme) => match scheme { TunnelScheme::Ip(scheme) => match scheme {
@@ -34,7 +34,7 @@ pub fn create_listener_by_url(
#[cfg(feature = "wireguard")] #[cfg(feature = "wireguard")]
IpScheme::Wg => { IpScheme::Wg => {
use crate::tunnel::wireguard::{WgConfig, WgTunnelListener}; use crate::tunnel::wireguard::{WgConfig, WgTunnelListener};
let nid = global_ctx.get_network_identity(); let nid = _global_ctx.get_network_identity();
let wg_config = WgConfig::new_from_network_identity( let wg_config = WgConfig::new_from_network_identity(
&nid.network_name, &nid.network_name,
&nid.network_secret.unwrap_or_default(), &nid.network_secret.unwrap_or_default(),
@@ -43,7 +43,7 @@ pub fn create_listener_by_url(
} }
#[cfg(feature = "quic")] #[cfg(feature = "quic")]
IpScheme::Quic => { IpScheme::Quic => {
tunnel::quic::QuicTunnelListener::new(l.clone(), global_ctx.clone()).boxed() tunnel::quic::QuicTunnelListener::new(l.clone(), _global_ctx.clone()).boxed()
} }
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
IpScheme::Ws | IpScheme::Wss => { IpScheme::Ws | IpScheme::Wss => {
+1
View File
@@ -105,6 +105,7 @@ enum CompressionAlgoPb {
Invalid = 0; Invalid = 0;
None = 1; None = 1;
Zstd = 2; Zstd = 2;
Lzo = 3;
} }
message RpcCompressionInfo { message RpcCompressionInfo {
+4
View File
@@ -467,6 +467,8 @@ impl TryFrom<CompressionAlgoPb> for CompressorAlgo {
match value { match value {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
CompressionAlgoPb::Zstd => Ok(CompressorAlgo::ZstdDefault), CompressionAlgoPb::Zstd => Ok(CompressorAlgo::ZstdDefault),
#[cfg(feature = "lzo")]
CompressionAlgoPb::Lzo => Ok(CompressorAlgo::Lzo),
CompressionAlgoPb::None => Ok(CompressorAlgo::None), CompressionAlgoPb::None => Ok(CompressorAlgo::None),
_ => Err(anyhow::anyhow!("Invalid CompressionAlgoPb")), _ => Err(anyhow::anyhow!("Invalid CompressionAlgoPb")),
} }
@@ -480,6 +482,8 @@ impl TryFrom<CompressorAlgo> for CompressionAlgoPb {
match value { match value {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
CompressorAlgo::ZstdDefault => Ok(CompressionAlgoPb::Zstd), CompressorAlgo::ZstdDefault => Ok(CompressionAlgoPb::Zstd),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => Ok(CompressionAlgoPb::Lzo),
CompressorAlgo::None => Ok(CompressionAlgoPb::None), CompressorAlgo::None => Ok(CompressionAlgoPb::None),
} }
} }
+4
View File
@@ -309,6 +309,8 @@ pub enum CompressorAlgo {
None = 0, None = 0,
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
ZstdDefault = 1, ZstdDefault = 1,
#[cfg(feature = "lzo")]
Lzo = 2,
} }
#[repr(C, packed)] #[repr(C, packed)]
@@ -323,6 +325,8 @@ impl CompressorTail {
match self.algo { match self.algo {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
1 => Some(CompressorAlgo::ZstdDefault), 1 => Some(CompressorAlgo::ZstdDefault),
#[cfg(feature = "lzo")]
2 => Some(CompressorAlgo::Lzo),
_ => None, _ => None,
} }
} }