Compare commits

..

3 Commits

Author SHA1 Message Date
fanyang 9d7a938e93 Address review comments 2026-05-04 10:42:51 +08:00
fanyang 6229229b31 feat: support lzo compression 2026-05-04 10:42:51 +08:00
fanyang 6a63853bad fix: silence listener warning in feature builds 2026-05-04 10:42:51 +08:00
46 changed files with 327 additions and 2355 deletions
+1 -4
View File
@@ -157,9 +157,6 @@ jobs:
- uses: mlugg/setup-zig@v2 - uses: mlugg/setup-zig@v2
if: ${{ contains(matrix.OS, 'ubuntu') }} if: ${{ contains(matrix.OS, 'ubuntu') }}
with:
version: 0.16.0
use-cache: true
- uses: taiki-e/install-action@v2 - uses: taiki-e/install-action@v2
if: ${{ contains(matrix.OS, 'ubuntu') }} if: ${{ contains(matrix.OS, 'ubuntu') }}
@@ -230,7 +227,7 @@ jobs:
*) UPX_ARCH="amd64" ;; *) UPX_ARCH="amd64" ;;
esac esac
UPX_VERSION=4.2.4 UPX_VERSION=5.1.1
UPX_PKG="upx-${UPX_VERSION}-${UPX_ARCH}_linux" UPX_PKG="upx-${UPX_VERSION}-${UPX_ARCH}_linux"
curl -L "https://github.com/upx/upx/releases/download/v${UPX_VERSION}/${UPX_PKG}.tar.xz" -s | tar xJvf - curl -L "https://github.com/upx/upx/releases/download/v${UPX_VERSION}/${UPX_PKG}.tar.xz" -s | tar xJvf -
cp "${UPX_PKG}/upx" . cp "${UPX_PKG}/upx" .
+1 -1
View File
@@ -11,7 +11,7 @@ on:
image_tag: image_tag:
description: 'Tag for this image build' description: 'Tag for this image build'
type: string type: string
default: 'v2.6.4' default: 'v2.6.3'
required: true required: true
mark_latest: mark_latest:
description: 'Mark this image as latest' description: 'Mark this image as latest'
+1 -1
View File
@@ -18,7 +18,7 @@ on:
version: version:
description: 'Version for this release' description: 'Version for this release'
type: string type: string
default: 'v2.6.4' default: 'v2.6.3'
required: true required: true
make_latest: make_latest:
description: 'Mark this release as latest' description: 'Mark this release as latest'
Generated
+14 -3
View File
@@ -2229,7 +2229,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
[[package]] [[package]]
name = "easytier" name = "easytier"
version = "2.6.4" version = "2.6.3"
dependencies = [ dependencies = [
"aes-gcm", "aes-gcm",
"anyhow", "anyhow",
@@ -2288,6 +2288,7 @@ dependencies = [
"indoc", "indoc",
"itertools 0.14.0", "itertools 0.14.0",
"kcp-sys", "kcp-sys",
"lzokay-native",
"machine-uid", "machine-uid",
"maplit", "maplit",
"mimalloc", "mimalloc",
@@ -2405,7 +2406,7 @@ dependencies = [
[[package]] [[package]]
name = "easytier-gui" name = "easytier-gui"
version = "2.6.4" version = "2.6.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -2486,7 +2487,7 @@ dependencies = [
[[package]] [[package]]
name = "easytier-web" name = "easytier-web"
version = "2.6.4" version = "2.6.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -4874,6 +4875,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "lzokay-native"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "792ba667add2798c6c3e988e630f4eb921b5cbc735044825b7111ef1582c8730"
dependencies = [
"byteorder",
"thiserror 1.0.63",
]
[[package]] [[package]]
name = "mac" name = "mac"
version = "0.1.1" version = "0.1.1"
+1 -1
View File
@@ -1,6 +1,6 @@
id=easytier_magisk id=easytier_magisk
name=EasyTier_Magisk name=EasyTier_Magisk
version=v2.6.4 version=v2.6.3
versionCode=1 versionCode=1
author=EasyTier author=EasyTier
description=easytier magisk module @EasyTier(https://github.com/EasyTier/EasyTier) description=easytier magisk module @EasyTier(https://github.com/EasyTier/EasyTier)
+1 -1
View File
@@ -1,7 +1,7 @@
{ {
"name": "easytier-gui", "name": "easytier-gui",
"type": "module", "type": "module",
"version": "2.6.4", "version": "2.6.3",
"private": true, "private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4", "packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": { "scripts": {
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "easytier-gui" name = "easytier-gui"
version = "2.6.4" version = "2.6.3"
description = "EasyTier GUI" description = "EasyTier GUI"
authors = ["you"] authors = ["you"]
edition.workspace = true edition.workspace = true
+1 -9
View File
@@ -490,18 +490,10 @@ async fn init_web_client(app: AppHandle, url: Option<String>) -> Result<(), Stri
.ok_or_else(|| "Instance manager is not available".to_string())?; .ok_or_else(|| "Instance manager is not available".to_string())?;
let hooks = Arc::new(manager::GuiHooks { app: app.clone() }); let hooks = Arc::new(manager::GuiHooks { app: app.clone() });
let machine_id_state_dir = app
.path()
.app_data_dir()
.with_context(|| "Failed to resolve machine id state directory")
.map_err(|e| format!("{:#}", e))?;
let web_client = web_client::run_web_client( let web_client = web_client::run_web_client(
url.as_str(), url.as_str(),
easytier::common::MachineIdOptions { None,
explicit_machine_id: None,
state_dir: Some(machine_id_state_dir),
},
None, None,
false, false,
instance_manager, instance_manager,
+1 -1
View File
@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false "createUpdaterArtifacts": false
}, },
"productName": "easytier-gui", "productName": "easytier-gui",
"version": "2.6.4", "version": "2.6.3",
"identifier": "com.kkrainbow.easytier", "identifier": "com.kkrainbow.easytier",
"plugins": { "plugins": {
"shell": { "shell": {
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "easytier-web" name = "easytier-web"
version = "2.6.4" version = "2.6.3"
edition.workspace = true edition.workspace = true
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server." description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."
@@ -99,7 +99,6 @@ const bool_flags: BoolFlag[] = [
{ field: 'disable_encryption', help: 'disable_encryption_help' }, { field: 'disable_encryption', help: 'disable_encryption_help' },
{ field: 'disable_tcp_hole_punching', help: 'disable_tcp_hole_punching_help' }, { field: 'disable_tcp_hole_punching', help: 'disable_tcp_hole_punching_help' },
{ field: 'disable_udp_hole_punching', help: 'disable_udp_hole_punching_help' }, { field: 'disable_udp_hole_punching', help: 'disable_udp_hole_punching_help' },
{ field: 'enable_udp_broadcast_relay', help: 'enable_udp_broadcast_relay_help' },
{ field: 'disable_upnp', help: 'disable_upnp_help' }, { field: 'disable_upnp', help: 'disable_upnp_help' },
{ field: 'disable_sym_hole_punching', help: 'disable_sym_hole_punching_help' }, { field: 'disable_sym_hole_punching', help: 'disable_sym_hole_punching_help' },
{ field: 'enable_magic_dns', help: 'enable_magic_dns_help' }, { field: 'enable_magic_dns', help: 'enable_magic_dns_help' },
@@ -160,9 +160,6 @@ disable_tcp_hole_punching_help: 禁用TCP打洞功能
disable_udp_hole_punching: 禁用UDP打洞 disable_udp_hole_punching: 禁用UDP打洞
disable_udp_hole_punching_help: 禁用UDP打洞功能 disable_udp_hole_punching_help: 禁用UDP打洞功能
enable_udp_broadcast_relay: UDP 广播中继
enable_udp_broadcast_relay_help: "仅 Windows:捕获物理网卡上的本机 UDP 广播包并转发给 EasyTier 对等节点,帮助局域网游戏发现房间。需要管理员权限。"
disable_upnp: 禁用 UPnP disable_upnp: 禁用 UPnP
disable_upnp_help: 禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启。 disable_upnp_help: 禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启。
@@ -263,7 +260,6 @@ event:
DhcpIpv4Conflicted: DHCP IPv4地址冲突 DhcpIpv4Conflicted: DHCP IPv4地址冲突
PortForwardAdded: 端口转发添加 PortForwardAdded: 端口转发添加
ProxyCidrsUpdated: 子网代理CIDR更新 ProxyCidrsUpdated: 子网代理CIDR更新
UdpBroadcastRelayStartResult: UDP广播中继启动结果
web: web:
login: login:
@@ -159,9 +159,6 @@ disable_tcp_hole_punching_help: Disable tcp hole punching
disable_udp_hole_punching: Disable UDP Hole Punching disable_udp_hole_punching: Disable UDP Hole Punching
disable_udp_hole_punching_help: Disable udp hole punching disable_udp_hole_punching_help: Disable udp hole punching
enable_udp_broadcast_relay: UDP Broadcast Relay
enable_udp_broadcast_relay_help: "Windows only: capture local UDP broadcast packets from physical interfaces and forward them to EasyTier peers. Helps games to find rooms in local network. Requires administrator privileges."
disable_upnp: Disable UPnP disable_upnp: Disable UPnP
disable_upnp_help: Disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default. disable_upnp_help: Disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default.
@@ -263,7 +260,6 @@ event:
DhcpIpv4Conflicted: DhcpIpv4Conflicted DhcpIpv4Conflicted: DhcpIpv4Conflicted
PortForwardAdded: PortForwardAdded PortForwardAdded: PortForwardAdded
ProxyCidrsUpdated: ProxyCidrsUpdated ProxyCidrsUpdated: ProxyCidrsUpdated
UdpBroadcastRelayStartResult: UDP Broadcast Relay Start Result
web: web:
login: login:
@@ -134,7 +134,6 @@ export interface NetworkConfig {
disable_tcp_hole_punching?: boolean disable_tcp_hole_punching?: boolean
disable_udp_hole_punching?: boolean disable_udp_hole_punching?: boolean
disable_upnp?: boolean disable_upnp?: boolean
enable_udp_broadcast_relay?: boolean
disable_sym_hole_punching?: boolean disable_sym_hole_punching?: boolean
enable_relay_network_whitelist?: boolean enable_relay_network_whitelist?: boolean
@@ -212,7 +211,6 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
disable_tcp_hole_punching: false, disable_tcp_hole_punching: false,
disable_udp_hole_punching: false, disable_udp_hole_punching: false,
disable_upnp: false, disable_upnp: false,
enable_udp_broadcast_relay: false,
disable_sym_hole_punching: false, disable_sym_hole_punching: false,
enable_relay_network_whitelist: false, enable_relay_network_whitelist: false,
relay_network_whitelist: [], relay_network_whitelist: [],
@@ -449,6 +447,4 @@ export enum EventType {
PortForwardAdded = 'PortForwardAdded', // PortForwardConfigPb PortForwardAdded = 'PortForwardAdded', // PortForwardConfigPb
ProxyCidrsUpdated = 'ProxyCidrsUpdated', // string[], string[] ProxyCidrsUpdated = 'ProxyCidrsUpdated', // string[], string[]
UdpBroadcastRelayStartResult = 'UdpBroadcastRelayStartResult', // { capture_backend?: string, error?: string }
} }
-1
View File
@@ -365,7 +365,6 @@ mod tests {
let _c = WebClient::new( let _c = WebClient::new(
connector, connector,
"test", "test",
uuid::Uuid::new_v4(),
"test", "test",
false, false,
Arc::new(NetworkInstanceManager::new()), Arc::new(NetworkInstanceManager::new()),
+5 -1
View File
@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command." description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier" homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier" repository = "https://github.com/EasyTier/EasyTier"
version = "2.6.4" version = "2.6.3"
edition.workspace = true edition.workspace = true
rust-version.workspace = true rust-version.workspace = true
authors = ["kkrainbow"] authors = ["kkrainbow"]
@@ -221,6 +221,7 @@ async-ringbuf = "0.3.1"
service-manager = { git = "https://github.com/EasyTier/service-manager-rs.git", branch = "main" } service-manager = { git = "https://github.com/EasyTier/service-manager-rs.git", branch = "main" }
zstd = { version = "0.13", optional = true } zstd = { version = "0.13", optional = true }
lzokay-native = { version = "0.1", optional = true }
kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "94964794caaed5d388463137da59b97499619e5f", optional = true } kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "94964794caaed5d388463137da59b97499619e5f", optional = true }
@@ -358,6 +359,7 @@ default = [
"faketcp", "faketcp",
"magic-dns", "magic-dns",
"zstd", "zstd",
"lzo",
] ]
full = [ full = [
"websocket", "websocket",
@@ -372,6 +374,7 @@ full = [
"faketcp", "faketcp",
"magic-dns", "magic-dns",
"zstd", "zstd",
"lzo",
] ]
wireguard = ["dep:boringtun", "dep:ring"] wireguard = ["dep:boringtun", "dep:ring"]
quic = ["dep:quinn", "dep:quinn-plaintext", "dep:rustls", "dep:rcgen"] quic = ["dep:quinn", "dep:quinn-plaintext", "dep:rustls", "dep:rcgen"]
@@ -402,5 +405,6 @@ tracing = ["tokio/tracing", "dep:console-subscriber"]
magic-dns = ["dep:hickory-client", "dep:hickory-server"] magic-dns = ["dep:hickory-client", "dep:hickory-server"]
faketcp = ["dep:flume"] faketcp = ["dep:flume"]
zstd = ["dep:zstd"] zstd = ["dep:zstd"]
lzo = ["dep:lzokay-native"]
# For Network Extension on macOS # For Network Extension on macOS
macos-ne = [] macos-ne = []
+4 -10
View File
@@ -12,9 +12,9 @@ core_clap:
仅用户名:--config-server admin,将使用官方的服务器 仅用户名:--config-server admin,将使用官方的服务器
machine_id: machine_id:
en: |+ en: |+
the machine id to identify this machine, used for config recovery after disconnection, must be unique and fixed. by default it is loaded from persisted local state; on first start it may be migrated from system information or generated, then remains fixed. the machine id to identify this machine, used for config recovery after disconnection, must be unique and fixed. default is from system.
zh-CN: |+ zh-CN: |+
Web 配置服务器通过 machine id 来识别机器,用于断线重连后的配置恢复,需要保证唯一且固定不变。默认从本地持久化状态读取;首次启动时可能基于系统信息迁移或生成,之后保持固定不变 Web 配置服务器通过 machine id 来识别机器,用于断线重连后的配置恢复,需要保证唯一且固定不变。默认从系统获得
config_file: config_file:
en: "path to the config file, NOTE: the options set by cmdline args will override options in config file" en: "path to the config file, NOTE: the options set by cmdline args will override options in config file"
zh-CN: "配置文件路径,注意:命令行中的配置的选项会覆盖配置文件中的选项" zh-CN: "配置文件路径,注意:命令行中的配置的选项会覆盖配置文件中的选项"
@@ -184,9 +184,6 @@ core_clap:
disable_upnp: disable_upnp:
en: "disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default" en: "disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default"
zh-CN: "禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启" zh-CN: "禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启"
enable_udp_broadcast_relay:
en: "Windows only: capture local UDP broadcast packets from physical interfaces and forward them to EasyTier peers. Helps games to find rooms in local network. Requires administrator privileges."
zh-CN: "仅 Windows:捕获物理网卡上的本机 UDP 广播包并转发给 EasyTier 对等节点,帮助局域网游戏发现房间。需要管理员权限。"
relay_all_peer_rpc: relay_all_peer_rpc:
en: "relay all peer rpc packets, even if the peer is not in the relay network whitelist. this can help peers not in relay network whitelist to establish p2p connection." en: "relay all peer rpc packets, even if the peer is not in the relay network whitelist. this can help peers not in relay network whitelist to establish p2p connection."
zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。" zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。"
@@ -197,8 +194,8 @@ core_clap:
en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port" en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port"
zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听" zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听"
compression: compression:
en: "compression algorithm to use, support none, zstd. default is none" en: "compression algorithm to use, supported: %{algorithms}. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none" zh-CN: "要使用的压缩算法,支持%{algorithms}。默认为 none"
mapped_listeners: mapped_listeners:
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple." en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。" zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。"
@@ -277,9 +274,6 @@ core_clap:
check_config: check_config:
en: Check config validity without starting the network en: Check config validity without starting the network
zh-CN: 检查配置文件的有效性并退出 zh-CN: 检查配置文件的有效性并退出
daemon:
en: Run in daemon mode
zh-CN: 以守护进程模式运行
file_log_size_mb: file_log_size_mb:
en: "per file log size in MB, default is 100MB" en: "per file log size in MB, default is 100MB"
zh-CN: "单个文件日志大小,单位 MB,默认值为 100MB" zh-CN: "单个文件日志大小,单位 MB,默认值为 100MB"
+3 -2
View File
@@ -11,8 +11,9 @@ use windows::{
NET_FW_RULE_DIR_OUT, NET_FW_RULE_DIR_OUT,
}, },
Networking::WinSock::{ Networking::WinSock::{
IP_UNICAST_IF, IPPROTO_IP, IPPROTO_IPV6, IPV6_UNICAST_IF, SIO_UDP_CONNRESET, SOCKET, IP_UNICAST_IF, IPPROTO_IP, IPPROTO_IPV6, IPV6_UNICAST_IF, SIO_UDP_CONNRESET,
SOCKET_ERROR, WSAGetLastError, WSAIoctl, htonl, setsockopt, SO_EXCLUSIVEADDRUSE, SOCKET, SOCKET_ERROR, SOL_SOCKET, WSAGetLastError, WSAIoctl,
htonl, setsockopt,
}, },
System::Com::{ System::Com::{
CLSCTX_ALL, COINIT_MULTITHREADED, CoCreateInstance, CoInitializeEx, CoUninitialize, CLSCTX_ALL, COINIT_MULTITHREADED, CoCreateInstance, CoInitializeEx, CoUninitialize,
+46 -10
View File
@@ -1,4 +1,4 @@
#[cfg(feature = "zstd")] #[cfg(any(feature = "zstd", feature = "lzo"))]
use anyhow::Context; use anyhow::Context;
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
use dashmap::DashMap; use dashmap::DashMap;
@@ -53,6 +53,13 @@ impl DefaultCompressor {
) )
}) })
}), }),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => lzokay_native::compress(data).with_context(|| {
format!(
"Failed to compress data with algorithm: {:?}",
compress_algo
)
}),
CompressorAlgo::None => Ok(data.to_vec()), CompressorAlgo::None => Ok(data.to_vec()),
} }
} }
@@ -85,6 +92,13 @@ impl DefaultCompressor {
compress_algo compress_algo
)) ))
}), }),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => lzokay_native::decompress_all(data, None).with_context(|| {
format!(
"Failed to decompress data with algorithm: {:?}",
compress_algo
)
}),
CompressorAlgo::None => Ok(data.to_vec()), CompressorAlgo::None => Ok(data.to_vec()),
} }
} }
@@ -181,14 +195,13 @@ thread_local! {
static DCTX_MAP: RefCell<DashMap<CompressorAlgo, bulk::Decompressor<'static>>> = RefCell::new(DashMap::new()); static DCTX_MAP: RefCell<DashMap<CompressorAlgo, bulk::Decompressor<'static>>> = RefCell::new(DashMap::new());
} }
#[cfg(all(test, feature = "zstd"))] #[cfg(all(test, any(feature = "zstd", feature = "lzo")))]
pub mod tests { pub mod tests {
use super::*; use super::*;
#[tokio::test] async fn test_compress_algo(compress_algo: CompressorAlgo) {
async fn test_compress() { let text = vec![b'a'; 4096];
let text = b"12345670000000000000000000"; let mut packet = ZCPacket::new_with_payload(&text);
let mut packet = ZCPacket::new_with_payload(text);
packet.fill_peer_manager_hdr(0, 0, 0); packet.fill_peer_manager_hdr(0, 0, 0);
let compressor = DefaultCompressor {}; let compressor = DefaultCompressor {};
@@ -200,7 +213,7 @@ pub mod tests {
); );
compressor compressor
.compress(&mut packet, CompressorAlgo::ZstdDefault) .compress(&mut packet, compress_algo)
.await .await
.unwrap(); .unwrap();
println!( println!(
@@ -215,8 +228,7 @@ pub mod tests {
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
} }
#[tokio::test] async fn test_short_text_compress_algo(compress_algo: CompressorAlgo) {
async fn test_short_text_compress() {
let text = b"1234"; let text = b"1234";
let mut packet = ZCPacket::new_with_payload(text); let mut packet = ZCPacket::new_with_payload(text);
packet.fill_peer_manager_hdr(0, 0, 0); packet.fill_peer_manager_hdr(0, 0, 0);
@@ -225,7 +237,7 @@ pub mod tests {
// short text can't be compressed // short text can't be compressed
compressor compressor
.compress(&mut packet, CompressorAlgo::ZstdDefault) .compress(&mut packet, compress_algo)
.await .await
.unwrap(); .unwrap();
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
@@ -234,4 +246,28 @@ pub mod tests {
assert_eq!(packet.payload(), text); assert_eq!(packet.payload(), text);
assert!(!packet.peer_manager_header().unwrap().is_compressed()); assert!(!packet.peer_manager_header().unwrap().is_compressed());
} }
#[cfg(feature = "zstd")]
#[tokio::test]
async fn test_zstd_compress() {
test_compress_algo(CompressorAlgo::ZstdDefault).await;
}
#[cfg(feature = "zstd")]
#[tokio::test]
async fn test_zstd_short_text_compress() {
test_short_text_compress_algo(CompressorAlgo::ZstdDefault).await;
}
#[cfg(feature = "lzo")]
#[tokio::test]
async fn test_lzo_compress() {
test_compress_algo(CompressorAlgo::Lzo).await;
}
#[cfg(feature = "lzo")]
#[tokio::test]
async fn test_lzo_short_text_compress() {
test_short_text_compress_algo(CompressorAlgo::Lzo).await;
}
} }
-1
View File
@@ -72,7 +72,6 @@ pub fn gen_default_flags() -> Flags {
instance_recv_bps_limit: u64::MAX, instance_recv_bps_limit: u64::MAX,
disable_upnp: false, disable_upnp: false,
disable_relay_data: false, disable_relay_data: false,
enable_udp_broadcast_relay: false,
} }
} }
+2
View File
@@ -23,6 +23,8 @@ define_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, u64, 1000);
define_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, u64, 10); define_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, u64, 10);
define_global_var!(MACHINE_UID, Option<String>, None);
define_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK, u32, 3); define_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK, u32, 3);
define_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER, bool, true); define_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER, bool, true);
-5
View File
@@ -77,11 +77,6 @@ pub enum GlobalCtxEvent {
ProxyCidrsUpdated(Vec<cidr::Ipv4Cidr>, Vec<cidr::Ipv4Cidr>), // (added, removed) ProxyCidrsUpdated(Vec<cidr::Ipv4Cidr>, Vec<cidr::Ipv4Cidr>), // (added, removed)
UdpBroadcastRelayStartResult {
capture_backend: Option<String>,
error: Option<String>,
},
CredentialChanged, CredentialChanged,
} }
-596
View File
@@ -1,596 +0,0 @@
use std::{
env,
ffi::OsString,
io::Write as _,
path::{Path, PathBuf},
time::{Duration, Instant},
};
use anyhow::Context as _;
#[cfg(unix)]
use nix::{
errno::Errno,
fcntl::{Flock, FlockArg},
};
#[derive(Debug, Clone, Default)]
pub struct MachineIdOptions {
pub explicit_machine_id: Option<String>,
pub state_dir: Option<PathBuf>,
}
pub fn resolve_machine_id(opts: &MachineIdOptions) -> anyhow::Result<uuid::Uuid> {
if let Some(explicit_machine_id) = opts.explicit_machine_id.as_deref() {
return Ok(parse_or_hash_machine_id(explicit_machine_id));
}
let state_file = resolve_machine_id_state_file(opts.state_dir.as_deref())?;
let allow_legacy_machine_uid_migration =
should_attempt_legacy_machine_uid_migration(&state_file);
if let Some(machine_id) = read_state_machine_id(&state_file)? {
return Ok(machine_id);
}
if let Some(machine_id) = read_legacy_machine_id_file() {
return persist_machine_id(&state_file, machine_id);
}
if allow_legacy_machine_uid_migration
&& let Some(machine_id) = resolve_legacy_machine_uid_hash()
{
return persist_machine_id(&state_file, machine_id);
}
let machine_id = resolve_new_machine_id().unwrap_or_else(uuid::Uuid::new_v4);
persist_machine_id(&state_file, machine_id)
}
fn parse_or_hash_machine_id(raw: &str) -> uuid::Uuid {
if let Ok(mid) = uuid::Uuid::parse_str(raw.trim()) {
return mid;
}
digest_uuid_from_str(raw)
}
fn digest_uuid_from_str(raw: &str) -> uuid::Uuid {
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", raw, &mut b);
uuid::Uuid::from_bytes(b)
}
fn resolve_machine_id_state_file(state_dir: Option<&Path>) -> anyhow::Result<PathBuf> {
let state_dir = match state_dir {
Some(dir) => dir.to_path_buf(),
None => default_machine_id_state_dir()?,
};
Ok(state_dir.join("machine_id"))
}
fn non_empty_os_string(value: Option<OsString>) -> Option<OsString> {
value.filter(|value| !value.is_empty())
}
#[cfg(target_os = "linux")]
fn default_linux_machine_id_state_dir(
xdg_data_home: Option<OsString>,
home: Option<OsString>,
) -> PathBuf {
if let Some(path) = non_empty_os_string(xdg_data_home) {
return PathBuf::from(path).join("easytier");
}
if let Some(home) = non_empty_os_string(home) {
return PathBuf::from(home)
.join(".local")
.join("share")
.join("easytier");
}
PathBuf::from("/var/lib/easytier")
}
fn default_machine_id_state_dir() -> anyhow::Result<PathBuf> {
cfg_select! {
target_os = "linux" => Ok(default_linux_machine_id_state_dir(
env::var_os("XDG_DATA_HOME"),
env::var_os("HOME"),
)),
all(target_os = "macos", not(feature = "macos-ne")) => {
let home = non_empty_os_string(env::var_os("HOME"))
.ok_or_else(|| anyhow::anyhow!("HOME is not set, cannot resolve machine id state directory"))?;
Ok(PathBuf::from(home)
.join("Library")
.join("Application Support")
.join("com.easytier"))
},
target_os = "windows" => {
let local_app_data = non_empty_os_string(env::var_os("LOCALAPPDATA")).ok_or_else(|| {
anyhow::anyhow!("LOCALAPPDATA is not set, cannot resolve machine id state directory")
})?;
Ok(PathBuf::from(local_app_data).join("easytier"))
},
target_os = "freebsd" => {
let home = non_empty_os_string(env::var_os("HOME"))
.ok_or_else(|| anyhow::anyhow!("HOME is not set, cannot resolve machine id state directory"))?;
Ok(PathBuf::from(home).join(".local").join("share").join("easytier"))
},
target_os = "android" => {
anyhow::bail!("machine id state directory must be provided explicitly on Android");
},
_ => anyhow::bail!("machine id state directory is unsupported on this platform"),
}
}
fn read_state_machine_id(path: &Path) -> anyhow::Result<Option<uuid::Uuid>> {
let Some(contents) = read_optional_file(path)? else {
return Ok(None);
};
let machine_id = uuid::Uuid::parse_str(contents.trim())
.with_context(|| format!("invalid machine id in state file {}", path.display()))?;
Ok(Some(machine_id))
}
fn read_legacy_machine_id_file() -> Option<uuid::Uuid> {
let path = legacy_machine_id_file_path()?;
read_legacy_machine_id_file_at(&path)
}
fn read_legacy_machine_id_file_at(path: &Path) -> Option<uuid::Uuid> {
let contents = match std::fs::read_to_string(path) {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
Err(err) => {
tracing::warn!(
path = %path.display(),
%err,
"ignoring unreadable legacy machine id file"
);
return None;
}
};
match uuid::Uuid::parse_str(contents.trim()) {
Ok(machine_id) => Some(machine_id),
Err(err) => {
tracing::warn!(
path = %path.display(),
%err,
"ignoring invalid legacy machine id file"
);
None
}
}
}
fn legacy_machine_id_file_path() -> Option<PathBuf> {
std::env::current_exe()
.ok()
.map(|path| path.with_file_name("et_machine_id"))
}
fn read_optional_file(path: &Path) -> anyhow::Result<Option<String>> {
match std::fs::read_to_string(path) {
Ok(contents) => Ok(Some(contents)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err).with_context(|| format!("failed to read {}", path.display())),
}
}
fn should_attempt_legacy_machine_uid_migration(state_file: &Path) -> bool {
let Some(state_dir) = state_file.parent() else {
return false;
};
let Ok(mut entries) = std::fs::read_dir(state_dir) else {
return false;
};
entries.any(|entry| entry.is_ok())
}
fn resolve_legacy_machine_uid_hash() -> Option<uuid::Uuid> {
machine_uid_seed().map(|seed| digest_uuid_from_str(seed.as_str()))
}
fn resolve_new_machine_id() -> Option<uuid::Uuid> {
let seed = machine_uid_seed()?;
#[cfg(target_os = "linux")]
{
let seed = linux_machine_id_seed(&seed);
Some(digest_uuid_from_str(&seed))
}
#[cfg(not(target_os = "linux"))]
{
Some(digest_uuid_from_str(&seed))
}
}
#[cfg(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
))]
fn machine_uid_seed() -> Option<String> {
machine_uid::get()
.ok()
.filter(|value| !value.trim().is_empty())
}
#[cfg(not(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
)))]
fn machine_uid_seed() -> Option<String> {
None
}
#[cfg(target_os = "linux")]
fn linux_machine_id_seed(machine_uid: &str) -> String {
let mut seed = format!("machine_uid={machine_uid}");
let hostname = gethostname::gethostname()
.to_string_lossy()
.trim()
.to_string();
if !hostname.is_empty() {
seed.push_str("\nhostname=");
seed.push_str(&hostname);
}
let mac_addresses = collect_linux_mac_addresses();
if !mac_addresses.is_empty() {
seed.push_str("\nmacs=");
seed.push_str(&mac_addresses.join(","));
}
seed
}
#[cfg(target_os = "linux")]
fn collect_linux_mac_addresses() -> Vec<String> {
let mut macs = Vec::new();
let Ok(entries) = std::fs::read_dir("/sys/class/net") else {
return macs;
};
for entry in entries.flatten() {
let Ok(name) = entry.file_name().into_string() else {
continue;
};
if name == "lo" {
continue;
}
let address_path = entry.path().join("address");
let Ok(address) = std::fs::read_to_string(address_path) else {
continue;
};
let address = address.trim().to_ascii_lowercase();
if address.is_empty() || address == "00:00:00:00:00:00" {
continue;
}
macs.push(address);
}
macs.sort();
macs.dedup();
macs.truncate(3);
macs
}
fn persist_machine_id(path: &Path, machine_id: uuid::Uuid) -> anyhow::Result<uuid::Uuid> {
if let Some(existing) = read_state_machine_id(path)? {
return Ok(existing);
}
let _lock = MachineIdWriteLock::acquire(path)?;
if let Some(existing) = read_state_machine_id(path)? {
return Ok(existing);
}
write_uuid_file_atomically(path, machine_id)?;
Ok(machine_id)
}
fn write_uuid_file_atomically(path: &Path, machine_id: uuid::Uuid) -> anyhow::Result<()> {
let parent = path.parent().ok_or_else(|| {
anyhow::anyhow!(
"machine id state file {} has no parent directory",
path.display()
)
})?;
std::fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create machine id state directory {}",
parent.display()
)
})?;
let tmp_path = parent.join(format!(
".machine_id.tmp-{}-{}",
std::process::id(),
uuid::Uuid::new_v4()
));
{
let mut file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.with_context(|| format!("failed to create {}", tmp_path.display()))?;
file.write_all(machine_id.to_string().as_bytes())
.with_context(|| format!("failed to write {}", tmp_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to flush {}", tmp_path.display()))?;
}
if let Err(err) = std::fs::rename(&tmp_path, path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err).with_context(|| {
format!(
"failed to move machine id state file into place at {}",
path.display()
)
});
}
Ok(())
}
struct MachineIdWriteLock {
#[cfg(unix)]
_lock: Flock<std::fs::File>,
#[cfg(not(unix))]
path: PathBuf,
}
impl MachineIdWriteLock {
fn acquire(path: &Path) -> anyhow::Result<Self> {
let parent = path.parent().ok_or_else(|| {
anyhow::anyhow!(
"machine id state file {} has no parent directory",
path.display()
)
})?;
std::fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create machine id state directory {}",
parent.display()
)
})?;
#[cfg(unix)]
{
Self::acquire_unix(path)
}
#[cfg(not(unix))]
{
Self::acquire_fallback(path)
}
}
#[cfg(unix)]
fn acquire_unix(path: &Path) -> anyhow::Result<Self> {
let lock_path = path.with_extension("lock");
let deadline = Instant::now() + Duration::from_secs(5);
let mut lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.with_context(|| format!("failed to open machine id lock {}", lock_path.display()))?;
loop {
match Flock::lock(lock_file, FlockArg::LockExclusiveNonblock) {
Ok(lock) => return Ok(Self { _lock: lock }),
Err((file, Errno::EAGAIN)) => {
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for machine id lock {}",
lock_path.display()
);
}
lock_file = file;
std::thread::sleep(Duration::from_millis(50));
}
Err((_file, err)) => {
anyhow::bail!(
"failed to acquire machine id lock {}: {}",
lock_path.display(),
err
);
}
}
}
}
#[cfg(not(unix))]
fn acquire_fallback(path: &Path) -> anyhow::Result<Self> {
let lock_path = path.with_extension("lock");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&lock_path)
{
Ok(mut file) => {
writeln!(file, "pid={}", std::process::id()).ok();
return Ok(Self { path: lock_path });
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
if should_reap_stale_lock_file(&lock_path) {
let _ = std::fs::remove_file(&lock_path);
continue;
}
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for machine id lock {}",
lock_path.display()
);
}
std::thread::sleep(Duration::from_millis(50));
}
Err(err) => {
return Err(err).with_context(|| {
format!("failed to acquire machine id lock {}", lock_path.display())
});
}
}
}
}
}
#[cfg(not(unix))]
fn should_reap_stale_lock_file(lock_path: &Path) -> bool {
const STALE_LOCK_AGE: Duration = Duration::from_secs(30);
let Ok(metadata) = std::fs::metadata(lock_path) else {
return false;
};
let Ok(modified) = metadata.modified() else {
return false;
};
modified
.elapsed()
.is_ok_and(|elapsed| elapsed >= STALE_LOCK_AGE)
}
impl Drop for MachineIdWriteLock {
fn drop(&mut self) {
#[cfg(not(unix))]
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolve_machine_id_uses_uuid_seed_verbatim() {
let raw = "33333333-3333-3333-3333-333333333333".to_string();
let opts = MachineIdOptions {
explicit_machine_id: Some(raw.clone()),
state_dir: None,
};
assert_eq!(
resolve_machine_id(&opts).unwrap(),
uuid::Uuid::parse_str(&raw).unwrap()
);
}
#[test]
fn test_resolve_machine_id_reads_state_file() {
let temp_dir = tempfile::tempdir().unwrap();
let expected = uuid::Uuid::new_v4();
std::fs::write(temp_dir.path().join("machine_id"), expected.to_string()).unwrap();
let opts = MachineIdOptions {
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
};
assert_eq!(resolve_machine_id(&opts).unwrap(), expected);
}
#[test]
fn test_read_legacy_machine_id_file_ignores_read_errors() {
let temp_dir = tempfile::tempdir().unwrap();
assert_eq!(read_legacy_machine_id_file_at(temp_dir.path()), None);
}
#[test]
fn test_write_uuid_file_atomically_writes_expected_contents() {
let temp_dir = tempfile::tempdir().unwrap();
let machine_id = uuid::Uuid::new_v4();
let state_file = temp_dir.path().join("machine_id");
write_uuid_file_atomically(&state_file, machine_id).unwrap();
assert_eq!(
std::fs::read_to_string(state_file).unwrap(),
machine_id.to_string()
);
}
#[test]
fn test_non_empty_os_string_filters_empty_values() {
assert_eq!(non_empty_os_string(Some(OsString::new())), None);
assert_eq!(
non_empty_os_string(Some(OsString::from("foo"))),
Some(OsString::from("foo"))
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_default_linux_machine_id_state_dir_falls_back_in_order() {
assert_eq!(
default_linux_machine_id_state_dir(
Some(OsString::from("/tmp/xdg")),
Some(OsString::from("/tmp/home"))
),
PathBuf::from("/tmp/xdg").join("easytier")
);
assert_eq!(
default_linux_machine_id_state_dir(
Some(OsString::new()),
Some(OsString::from("/tmp/home"))
),
PathBuf::from("/tmp/home")
.join(".local")
.join("share")
.join("easytier")
);
assert_eq!(
default_linux_machine_id_state_dir(Some(OsString::new()), Some(OsString::new())),
PathBuf::from("/var/lib/easytier")
);
}
#[test]
fn test_persist_machine_id_creates_missing_state_dir() {
let temp_dir = tempfile::tempdir().unwrap();
let state_file = temp_dir.path().join("nested").join("machine_id");
let machine_id = uuid::Uuid::new_v4();
assert_eq!(
persist_machine_id(&state_file, machine_id).unwrap(),
machine_id
);
assert_eq!(
std::fs::read_to_string(state_file).unwrap(),
machine_id.to_string()
);
}
#[test]
fn test_legacy_machine_uid_migration_requires_existing_state_dir_content() {
let temp_dir = tempfile::tempdir().unwrap();
let missing_state_file = temp_dir.path().join("missing").join("machine_id");
assert!(!should_attempt_legacy_machine_uid_migration(
&missing_state_file
));
let empty_dir = temp_dir.path().join("empty");
std::fs::create_dir_all(&empty_dir).unwrap();
assert!(!should_attempt_legacy_machine_uid_migration(
&empty_dir.join("machine_id")
));
std::fs::write(empty_dir.join("config.toml"), "x=1").unwrap();
assert!(should_attempt_legacy_machine_uid_migration(
&empty_dir.join("machine_id")
));
}
}
+76 -3
View File
@@ -1,12 +1,15 @@
use std::{ use std::{
fmt::Debug, fmt::Debug,
future, future,
io::Write as _,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use time::util::refresh_tz; use time::util::refresh_tz;
use tokio::{task::JoinSet, time::timeout}; use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument; use tracing::Instrument;
use crate::{set_global_var, use_global_var};
pub mod acl_processor; pub mod acl_processor;
pub mod compressor; pub mod compressor;
pub mod config; pub mod config;
@@ -18,7 +21,6 @@ pub mod global_ctx;
pub mod idn; pub mod idn;
pub mod ifcfg; pub mod ifcfg;
pub mod log; pub mod log;
pub mod machine_id;
pub mod netns; pub mod netns;
pub mod network; pub mod network;
pub mod os_info; pub mod os_info;
@@ -29,8 +31,6 @@ pub mod token_bucket;
pub mod tracing_rolling_appender; pub mod tracing_rolling_appender;
pub mod upnp; pub mod upnp;
pub use machine_id::{MachineIdOptions, resolve_machine_id};
pub fn get_logger_timer<F: time::formatting::Formattable>( pub fn get_logger_timer<F: time::formatting::Formattable>(
format: F, format: F,
) -> tracing_subscriber::fmt::time::OffsetTime<F> { ) -> tracing_subscriber::fmt::time::OffsetTime<F> {
@@ -96,6 +96,71 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
); );
} }
pub fn set_default_machine_id(mid: Option<String>) {
set_global_var!(MACHINE_UID, mid);
}
pub fn get_machine_id() -> uuid::Uuid {
if let Some(default_mid) = use_global_var!(MACHINE_UID) {
if let Ok(mid) = uuid::Uuid::parse_str(default_mid.trim()) {
return mid;
}
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", &default_mid, &mut b);
return uuid::Uuid::from_bytes(b);
}
// a path same as the binary
let machine_id_file = std::env::current_exe()
.map(|x| x.with_file_name("et_machine_id"))
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
// try load from local file
if let Ok(mid) = std::fs::read_to_string(&machine_id_file)
&& let Ok(mid) = uuid::Uuid::parse_str(mid.trim())
{
return mid;
}
#[cfg(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
))]
let gen_mid = machine_uid::get()
.map(|x| {
if x.is_empty() {
return uuid::Uuid::new_v4();
}
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
uuid::Uuid::from_bytes(b)
})
.ok();
#[cfg(not(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
)))]
let gen_mid = None;
if let Some(mid) = gen_mid {
return mid;
}
let gen_mid = uuid::Uuid::new_v4();
// try save to local file
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
let _ = file.write_all(gen_mid.to_string().as_bytes());
}
gen_mid
}
pub fn shrink_dashmap<K: Eq + std::hash::Hash, V>( pub fn shrink_dashmap<K: Eq + std::hash::Hash, V>(
map: &dashmap::DashMap<K, V>, map: &dashmap::DashMap<K, V>,
threshold: Option<usize>, threshold: Option<usize>,
@@ -145,4 +210,12 @@ mod tests {
assert_eq!(weak_js.weak_count(), 0); assert_eq!(weak_js.weak_count(), 0);
assert_eq!(weak_js.strong_count(), 0); assert_eq!(weak_js.strong_count(), 0);
} }
#[test]
fn test_get_machine_id_uses_uuid_seed_verbatim() {
let raw = "33333333-3333-3333-3333-333333333333".to_string();
set_default_machine_id(Some(raw.clone()));
assert_eq!(get_machine_id(), uuid::Uuid::parse_str(&raw).unwrap());
set_default_machine_id(None);
}
} }
-22
View File
@@ -85,15 +85,6 @@ pub enum MetricName {
/// Traffic packets forwarded for foreign network, forward /// Traffic packets forwarded for foreign network, forward
TrafficPacketsForeignForwardForwarded, TrafficPacketsForeignForwardForwarded,
/// UDP broadcast relay packets captured from the raw socket
UdpBroadcastRelayPacketsCaptured,
/// UDP broadcast relay packets ignored before forwarding
UdpBroadcastRelayPacketsIgnored,
/// UDP broadcast relay packets forwarded
UdpBroadcastRelayPacketsForwarded,
/// UDP broadcast relay packets that failed to forward
UdpBroadcastRelayPacketsForwardFailed,
/// Compression bytes before compression /// Compression bytes before compression
CompressionBytesRxBefore, CompressionBytesRxBefore,
/// Compression bytes after compression /// Compression bytes after compression
@@ -176,19 +167,6 @@ impl fmt::Display for MetricName {
write!(f, "traffic_packets_foreign_forward_forwarded") write!(f, "traffic_packets_foreign_forward_forwarded")
} }
MetricName::UdpBroadcastRelayPacketsCaptured => {
write!(f, "udp_broadcast_relay_packets_captured")
}
MetricName::UdpBroadcastRelayPacketsIgnored => {
write!(f, "udp_broadcast_relay_packets_ignored")
}
MetricName::UdpBroadcastRelayPacketsForwarded => {
write!(f, "udp_broadcast_relay_packets_forwarded")
}
MetricName::UdpBroadcastRelayPacketsForwardFailed => {
write!(f, "udp_broadcast_relay_packets_forward_failed")
}
MetricName::CompressionBytesRxBefore => write!(f, "compression_bytes_rx_before"), MetricName::CompressionBytesRxBefore => write!(f, "compression_bytes_rx_before"),
MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"), MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"),
MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"), MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"),
+52 -162
View File
@@ -1,8 +1,6 @@
use std::{ use std::{
collections::BTreeSet, collections::BTreeSet,
future::Future,
sync::{Arc, Weak}, sync::{Arc, Weak},
time::{Duration, Instant},
}; };
use dashmap::DashSet; use dashmap::DashSet;
@@ -18,7 +16,7 @@ use crate::{
}, },
rpc_types::{self, controller::BaseController}, rpc_types::{self, controller::BaseController},
}, },
tunnel::{IpVersion, TunnelConnector, TunnelScheme, matches_scheme}, tunnel::{IpVersion, TunnelConnector},
utils::weak_upgrade, utils::weak_upgrade,
}; };
@@ -85,55 +83,6 @@ impl ManualConnectorManager {
ret ret
} }
fn reconnect_timeout(dead_url: &url::Url) -> Duration {
let use_long_timeout = matches_scheme!(
dead_url,
TunnelScheme::Http | TunnelScheme::Https | TunnelScheme::Txt | TunnelScheme::Srv
) || matches!(dead_url.scheme(), "ws" | "wss");
Duration::from_secs(if use_long_timeout { 20 } else { 2 })
}
fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option<Duration> {
let remaining = total_timeout.checked_sub(started_at.elapsed())?;
(!remaining.is_zero()).then_some(remaining)
}
fn emit_connect_error(
data: &ConnectorManagerData,
dead_url: &url::Url,
ip_version: IpVersion,
error: &Error,
) {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:#?}", error),
));
}
fn reconnect_timeout_error(stage: &str, duration: Duration) -> Error {
Error::AnyhowError(anyhow::anyhow!("{} timeout after {:?}", stage, duration))
}
async fn with_reconnect_timeout<T, F>(
stage: &'static str,
started_at: Instant,
total_timeout: Duration,
fut: F,
) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or_else(|| Self::reconnect_timeout_error(stage, started_at.elapsed()))?;
timeout(remaining, fut)
.await
.map_err(|_| Self::reconnect_timeout_error(stage, remaining))?
}
}
impl ManualConnectorManager {
pub fn add_connector<T>(&self, connector: T) pub fn add_connector<T>(&self, connector: T)
where where
T: TunnelConnector + 'static, T: TunnelConnector + 'static,
@@ -293,18 +242,11 @@ impl ManualConnectorManager {
async fn conn_reconnect_with_ip_version( async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>, data: Arc<ConnectorManagerData>,
dead_url: url::Url, dead_url: String,
ip_version: IpVersion, ip_version: IpVersion,
started_at: Instant,
total_timeout: Duration,
) -> Result<ReconnResult, Error> { ) -> Result<ReconnResult, Error> {
let connector = Self::with_reconnect_timeout( let connector =
"resolve", create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?;
started_at,
total_timeout,
create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version),
)
.await?;
data.global_ctx data.global_ctx
.issue_event(GlobalCtxEvent::Connecting(connector.remote_url())); .issue_event(GlobalCtxEvent::Connecting(connector.remote_url()));
@@ -315,25 +257,10 @@ impl ManualConnectorManager {
))); )));
}; };
let tunnel = Self::with_reconnect_timeout( let (peer_id, conn_id) = pm.try_direct_connect(connector).await?;
"connect",
started_at,
total_timeout,
pm.connect_tunnel(connector),
)
.await?;
let (peer_id, conn_id) = Self::with_reconnect_timeout(
"handshake",
started_at,
total_timeout,
pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None),
)
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult { Ok(ReconnResult {
dead_url: dead_url.to_string(), dead_url,
peer_id, peer_id,
conn_id, conn_id,
}) })
@@ -346,33 +273,22 @@ impl ManualConnectorManager {
tracing::info!("reconnect: {}", dead_url); tracing::info!("reconnect: {}", dead_url);
let mut ip_versions = vec![]; let mut ip_versions = vec![];
if matches_scheme!( if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" {
dead_url,
TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv
) {
ip_versions.push(IpVersion::Both); ip_versions.push(IpVersion::Both);
} else { } else {
let converted_dead_url = let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?;
match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) { let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await {
Ok(url) => url,
Err(error) => {
let error: Error = error.into();
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
let addrs = match Self::with_reconnect_timeout(
"resolve",
Instant::now(),
Self::reconnect_timeout(&dead_url),
socket_addrs(&converted_dead_url, || Some(1000)),
)
.await
{
Ok(addrs) => addrs, Ok(addrs) => addrs,
Err(error) => { Err(e) => {
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error); data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
return Err(error); dead_url.to_string(),
format!("{:?}", IpVersion::Both),
format!("{:?}", e),
));
return Err(Error::AnyhowError(anyhow::anyhow!(
"get ip from url failed: {:?}",
e
)));
} }
}; };
tracing::info!(?addrs, ?dead_url, "get ip from url done"); tracing::info!(?addrs, ?dead_url, "get ip from url done");
@@ -397,24 +313,46 @@ impl ManualConnectorManager {
"cannot get ip from url" "cannot get ip from url"
))); )));
for ip_version in ip_versions { for ip_version in ip_versions {
let started_at = Instant::now(); let use_long_timeout = dead_url.scheme() == "http"
let ret = Self::conn_reconnect_with_ip_version( || dead_url.scheme() == "https"
data.clone(), || dead_url.scheme() == "ws"
dead_url.clone(), || dead_url.scheme() == "wss"
ip_version, || dead_url.scheme() == "txt"
started_at, || dead_url.scheme() == "srv";
Self::reconnect_timeout(&dead_url), let ret = timeout(
// allow http/websocket connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.to_string(),
ip_version,
),
) )
.await; .await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
match ret { match ret {
Ok(result) => return Ok(result), Ok(Ok(_)) => {
Err(error) => { // 外层和内层都成功:解包并跳出
Self::emit_connect_error(&data, &dead_url, ip_version, &error); reconn_ret = ret.unwrap();
reconn_ret = Err(error); break;
}
Ok(Err(e)) => {
// 外层成功,内层失败
reconn_ret = Err(e);
}
Err(e) => {
// 外层失败
reconn_ret = Err(e.into());
} }
} }
// 发送事件(只有在未 break 时才执行)
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:?}", reconn_ret),
));
} }
reconn_ret reconn_ret
@@ -450,54 +388,6 @@ mod tests {
use super::*; use super::*;
#[tokio::test]
async fn reconnect_timeout_reports_exhausted_budget_for_stage() {
let started_at = Instant::now() - Duration::from_millis(50);
let err = ManualConnectorManager::with_reconnect_timeout(
"resolve",
started_at,
Duration::from_millis(1),
async { Ok::<(), Error>(()) },
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("resolve timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_reports_stage_timeout_with_remaining_budget() {
let err = ManualConnectorManager::with_reconnect_timeout(
"handshake",
Instant::now(),
Duration::from_millis(10),
async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<(), Error>(())
},
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("handshake timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_preserves_success_within_budget() {
let result = ManualConnectorManager::with_reconnect_timeout(
"connect",
Instant::now(),
Duration::from_millis(50),
async { Ok::<_, Error>(123_u32) },
)
.await
.unwrap();
assert_eq!(result, 123);
}
#[tokio::test] #[tokio::test]
async fn test_reconnect_with_connecting_addr() { async fn test_reconnect_with_connecting_addr() {
set_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, 1); set_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, 1);
+50 -26
View File
@@ -37,6 +37,38 @@ use crate::tunnel::IpScheme;
#[cfg(feature = "jemalloc-prof")] #[cfg(feature = "jemalloc-prof")]
use jemalloc_ctl::{Access as _, AsName as _, epoch, stats}; use jemalloc_ctl::{Access as _, AsName as _, epoch, stats};
fn supported_compression_algorithms() -> &'static str {
cfg_select! {
all(feature = "zstd", feature = "lzo") => "none, zstd, lzo",
feature = "zstd" => "none, zstd",
feature = "lzo" => "none, lzo",
_ => "none",
}
}
fn compression_help() -> String {
t!(
"core_clap.compression",
algorithms = supported_compression_algorithms()
)
.to_string()
}
fn parse_compression_algorithm(compression: &str) -> anyhow::Result<CompressionAlgoPb> {
match compression {
"none" => Ok(CompressionAlgoPb::None),
#[cfg(feature = "zstd")]
"zstd" => Ok(CompressionAlgoPb::Zstd),
#[cfg(feature = "lzo")]
"lzo" => Ok(CompressionAlgoPb::Lzo),
_ => anyhow::bail!(
"unknown compression algorithm: {}, supported: {}",
compression,
supported_compression_algorithms()
),
}
}
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
windows_service::define_windows_service!(ffi_service_main, win_service_main); windows_service::define_windows_service!(ffi_service_main, win_service_main);
@@ -484,15 +516,6 @@ struct NetworkOptions {
)] )]
disable_upnp: Option<bool>, disable_upnp: Option<bool>,
#[arg(
long,
env = "ET_ENABLE_UDP_BROADCAST_RELAY",
help = t!("core_clap.enable_udp_broadcast_relay").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
enable_udp_broadcast_relay: Option<bool>,
#[arg( #[arg(
long, long,
env = "ET_RELAY_ALL_PEER_RPC", env = "ET_RELAY_ALL_PEER_RPC",
@@ -522,7 +545,7 @@ struct NetworkOptions {
#[arg( #[arg(
long, long,
env = "ET_COMPRESSION", env = "ET_COMPRESSION",
help = t!("core_clap.compression").to_string(), help = compression_help(),
)] )]
compression: Option<String>, compression: Option<String>,
@@ -1115,15 +1138,7 @@ impl NetworkOptions {
f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p); f.need_p2p = self.need_p2p.unwrap_or(f.need_p2p);
f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread); f.multi_thread = self.multi_thread.unwrap_or(f.multi_thread);
if let Some(compression) = &self.compression { if let Some(compression) = &self.compression {
f.data_compress_algo = match compression.as_str() { f.data_compress_algo = parse_compression_algorithm(compression)?.into();
"none" => CompressionAlgoPb::None,
"zstd" => CompressionAlgoPb::Zstd,
_ => panic!(
"unknown compression algorithm: {}, supported: none, zstd",
compression
),
}
.into();
} }
f.bind_device = self.bind_device.unwrap_or(f.bind_device); f.bind_device = self.bind_device.unwrap_or(f.bind_device);
f.enable_kcp_proxy = self.enable_kcp_proxy.unwrap_or(f.enable_kcp_proxy); f.enable_kcp_proxy = self.enable_kcp_proxy.unwrap_or(f.enable_kcp_proxy);
@@ -1151,9 +1166,6 @@ impl NetworkOptions {
.disable_sym_hole_punching .disable_sym_hole_punching
.unwrap_or(f.disable_sym_hole_punching); .unwrap_or(f.disable_sym_hole_punching);
f.disable_upnp = self.disable_upnp.unwrap_or(f.disable_upnp); f.disable_upnp = self.disable_upnp.unwrap_or(f.disable_upnp);
f.enable_udp_broadcast_relay = self
.enable_udp_broadcast_relay
.unwrap_or(f.enable_udp_broadcast_relay);
// Configure tld_dns_zone: use provided value if set // Configure tld_dns_zone: use provided value if set
if let Some(tld_dns_zone) = &self.tld_dns_zone { if let Some(tld_dns_zone) = &self.tld_dns_zone {
f.tld_dns_zone = tld_dns_zone.clone(); f.tld_dns_zone = tld_dns_zone.clone();
@@ -1348,10 +1360,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
let _web_client = if let Some(config_server_url_s) = cli.config_server.as_ref() { let _web_client = if let Some(config_server_url_s) = cli.config_server.as_ref() {
let wc = web_client::run_web_client( let wc = web_client::run_web_client(
config_server_url_s, config_server_url_s,
crate::common::MachineIdOptions { cli.machine_id.clone(),
explicit_machine_id: cli.machine_id.clone(),
state_dir: None,
},
cli.network_options.hostname.clone(), cli.network_options.hostname.clone(),
cli.network_options.secure_mode.unwrap_or(false), cli.network_options.secure_mode.unwrap_or(false),
manager.clone(), manager.clone(),
@@ -1642,6 +1651,21 @@ async fn validate_config(cli: &Cli) -> anyhow::Result<()> {
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_compression_help_uses_supported_algorithms() {
assert!(compression_help().contains(supported_compression_algorithms()));
}
#[test]
fn test_parse_compression_algorithm_rejects_unknown() {
let err = parse_compression_algorithm("snappy")
.unwrap_err()
.to_string();
assert!(err.contains("snappy"));
assert!(err.contains(supported_compression_algorithms()));
}
#[test] #[test]
fn test_parse_listeners() { fn test_parse_listeners() {
type IpSchemeMap = fn(&IpScheme) -> String; type IpSchemeMap = fn(&IpScheme) -> String;
+6 -20
View File
@@ -193,11 +193,8 @@ struct PeerArgs {
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
enum PeerSubCommand { enum PeerSubCommand {
/// List connected peers
List, List,
/// Show public IPv6 address information
Ipv6, Ipv6,
/// List foreign networks discovered by this instance
ListForeign { ListForeign {
#[arg( #[arg(
long, long,
@@ -206,7 +203,6 @@ enum PeerSubCommand {
)] )]
trusted_keys: bool, trusted_keys: bool,
}, },
/// List global foreign networks from the peer center
ListGlobalForeign, ListGlobalForeign,
} }
@@ -218,18 +214,16 @@ struct RouteArgs {
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
enum RouteSubCommand { enum RouteSubCommand {
/// List routes propagated by peers
List, List,
/// Dump routes in CIDR format
Dump, Dump,
} }
#[derive(Args, Debug)] #[derive(Args, Debug)]
struct ConnectorArgs { struct ConnectorArgs {
#[arg(short, long, help = "filter connectors by virtual IPv4 address")] #[arg(short, long)]
ipv4: Option<String>, ipv4: Option<String>,
#[arg(short, long, help = "filter connectors by peer URL")] #[arg(short, long)]
peers: Vec<String>, peers: Vec<String>,
#[command(subcommand)] #[command(subcommand)]
@@ -248,7 +242,6 @@ enum ConnectorSubCommand {
#[arg(help = "connector url, e.g., tcp://1.2.3.4:11010")] #[arg(help = "connector url, e.g., tcp://1.2.3.4:11010")]
url: String, url: String,
}, },
/// List connectors
List, List,
} }
@@ -290,7 +283,6 @@ struct AclArgs {
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
enum AclSubCommand { enum AclSubCommand {
/// Show ACL rule hit statistics
Stats, Stats,
} }
@@ -458,25 +450,19 @@ struct InstallArgs {
#[arg(long, default_value = env!("CARGO_PKG_DESCRIPTION"), help = "service description")] #[arg(long, default_value = env!("CARGO_PKG_DESCRIPTION"), help = "service description")]
description: String, description: String,
#[arg(long, help = "display name shown by the service manager")] #[arg(long)]
display_name: Option<String>, display_name: Option<String>,
#[arg( #[arg(long)]
long,
help = "whether to disable starting the service automatically on boot (true/false)"
)]
disable_autostart: Option<bool>, disable_autostart: Option<bool>,
#[arg( #[arg(long)]
long,
help = "whether to disable automatic restart when the service fails (true/false)"
)]
disable_restart_on_failure: Option<bool>, disable_restart_on_failure: Option<bool>,
#[arg(long, help = "path to easytier-core binary")] #[arg(long, help = "path to easytier-core binary")]
core_path: Option<PathBuf>, core_path: Option<PathBuf>,
#[arg(long, help = "working directory for the easytier-core service")] #[arg(long)]
service_work_dir: Option<PathBuf>, service_work_dir: Option<PathBuf>,
#[arg( #[arg(
+3 -3
View File
@@ -25,7 +25,7 @@ use crate::{
pub fn create_listener_by_url( pub fn create_listener_by_url(
l: &url::Url, l: &url::Url,
global_ctx: ArcGlobalCtx, _global_ctx: ArcGlobalCtx,
) -> Result<Box<dyn TunnelListener>, Error> { ) -> Result<Box<dyn TunnelListener>, Error> {
Ok(match l.try_into()? { Ok(match l.try_into()? {
TunnelScheme::Ip(scheme) => match scheme { TunnelScheme::Ip(scheme) => match scheme {
@@ -34,7 +34,7 @@ pub fn create_listener_by_url(
#[cfg(feature = "wireguard")] #[cfg(feature = "wireguard")]
IpScheme::Wg => { IpScheme::Wg => {
use crate::tunnel::wireguard::{WgConfig, WgTunnelListener}; use crate::tunnel::wireguard::{WgConfig, WgTunnelListener};
let nid = global_ctx.get_network_identity(); let nid = _global_ctx.get_network_identity();
let wg_config = WgConfig::new_from_network_identity( let wg_config = WgConfig::new_from_network_identity(
&nid.network_name, &nid.network_name,
&nid.network_secret.unwrap_or_default(), &nid.network_secret.unwrap_or_default(),
@@ -43,7 +43,7 @@ pub fn create_listener_by_url(
} }
#[cfg(feature = "quic")] #[cfg(feature = "quic")]
IpScheme::Quic => { IpScheme::Quic => {
tunnel::quic::QuicTunnelListener::new(l.clone(), global_ctx.clone()).boxed() tunnel::quic::QuicTunnelListener::new(l.clone(), _global_ctx.clone()).boxed()
} }
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
IpScheme::Ws | IpScheme::Wss => { IpScheme::Ws | IpScheme::Wss => {
-3
View File
@@ -10,6 +10,3 @@ pub mod proxy_cidrs_monitor;
#[cfg(feature = "tun")] #[cfg(feature = "tun")]
pub mod virtual_nic; pub mod virtual_nic;
#[cfg(any(windows, test))]
pub(crate) mod windows_udp_broadcast;
@@ -1,8 +1,5 @@
#[cfg(target_os = "linux")] use std::{path::Path, sync::Arc};
use std::path::Path;
use std::sync::Arc;
#[cfg(target_os = "linux")]
use anyhow::Context; use anyhow::Context;
use cidr::{Ipv6Cidr, Ipv6Inet}; use cidr::{Ipv6Cidr, Ipv6Inet};
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@@ -324,7 +321,7 @@ async fn resolve_public_ipv6_provider_runtime_state_linux(
} }
async fn resolve_public_ipv6_provider_runtime_state( async fn resolve_public_ipv6_provider_runtime_state(
_global_ctx: &ArcGlobalCtx, global_ctx: &ArcGlobalCtx,
config: PublicIpv6ProviderConfigSnapshot, config: PublicIpv6ProviderConfigSnapshot,
) -> PublicIpv6ProviderRuntimeState { ) -> PublicIpv6ProviderRuntimeState {
if !config.provider_enabled { if !config.provider_enabled {
@@ -334,7 +331,7 @@ async fn resolve_public_ipv6_provider_runtime_state(
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {
return resolve_public_ipv6_provider_runtime_state_linux( return resolve_public_ipv6_provider_runtime_state_linux(
_global_ctx, global_ctx,
config.configured_prefix, config.configured_prefix,
) )
.await; .await;
-35
View File
@@ -35,8 +35,6 @@ use tokio::{
task::JoinSet, task::JoinSet,
}; };
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
#[cfg(target_os = "windows")]
use tokio_util::task::AbortOnDropHandle;
use tun::{AbstractDevice, AsyncDevice, Configuration, Layer}; use tun::{AbstractDevice, AsyncDevice, Configuration, Layer};
use zerocopy::{NativeEndian, NetworkEndian}; use zerocopy::{NativeEndian, NetworkEndian};
@@ -803,9 +801,6 @@ pub struct NicCtx {
nic: Arc<Mutex<VirtualNic>>, nic: Arc<Mutex<VirtualNic>>,
tasks: JoinSet<()>, tasks: JoinSet<()>,
#[cfg(target_os = "windows")]
windows_udp_broadcast_relay: Option<AbortOnDropHandle<()>>,
} }
impl NicCtx { impl NicCtx {
@@ -824,9 +819,6 @@ impl NicCtx {
nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))), nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))),
tasks: JoinSet::new(), tasks: JoinSet::new(),
#[cfg(target_os = "windows")]
windows_udp_broadcast_relay: None,
} }
} }
@@ -1013,31 +1005,6 @@ impl NicCtx {
}); });
} }
#[cfg(target_os = "windows")]
fn start_windows_udp_broadcast_relay(&mut self, virtual_ipv4: Ipv4Inet) {
if !self.global_ctx.get_flags().enable_udp_broadcast_relay {
return;
}
let Some(peer_manager) = self.peer_mgr.upgrade() else {
tracing::warn!("peer manager is dropped, skip Windows UDP broadcast relay");
return;
};
match super::windows_udp_broadcast::start(peer_manager, virtual_ipv4) {
Ok(handle) => {
self.windows_udp_broadcast_relay = Some(handle);
tracing::info!("Windows UDP broadcast relay started");
}
Err(err) => {
tracing::warn!(
?err,
"failed to start Windows UDP broadcast relay; administrator privileges are required"
);
}
}
}
async fn apply_route_changes( async fn apply_route_changes(
ifcfg: &impl IfConfiguerTrait, ifcfg: &impl IfConfiguerTrait,
ifname: &str, ifname: &str,
@@ -1380,8 +1347,6 @@ impl NicCtx {
// Assign IPv4 address if provided // Assign IPv4 address if provided
if let Some(ipv4_addr) = ipv4_addr { if let Some(ipv4_addr) = ipv4_addr {
self.assign_ipv4_to_tun_device(ipv4_addr).await?; self.assign_ipv4_to_tun_device(ipv4_addr).await?;
#[cfg(target_os = "windows")]
self.start_windows_udp_broadcast_relay(ipv4_addr);
} }
// Assign IPv6 address if provided // Assign IPv6 address if provided
File diff suppressed because it is too large Load Diff
-22
View File
@@ -474,28 +474,6 @@ fn handle_event(
); );
} }
GlobalCtxEvent::UdpBroadcastRelayStartResult {
capture_backend,
error,
} => {
if let Some(error) = error {
event!(
warn,
?capture_backend,
%error,
"[{}] UDP broadcast relay start failed",
instance_id
);
} else {
event!(
info,
?capture_backend,
"[{}] UDP broadcast relay started",
instance_id
);
}
}
GlobalCtxEvent::CredentialChanged => { GlobalCtxEvent::CredentialChanged => {
event!(info, "[{}] credential changed", instance_id); event!(info, "[{}] credential changed", instance_id);
} }
-6
View File
@@ -820,10 +820,6 @@ impl NetworkConfig {
flags.disable_relay_data = disable_relay_data; flags.disable_relay_data = disable_relay_data;
} }
if let Some(enable_udp_broadcast_relay) = self.enable_udp_broadcast_relay {
flags.enable_udp_broadcast_relay = enable_udp_broadcast_relay;
}
if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching { if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching {
flags.disable_sym_hole_punching = disable_sym_hole_punching; flags.disable_sym_hole_punching = disable_sym_hole_punching;
} }
@@ -999,7 +995,6 @@ impl NetworkConfig {
result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching); result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching);
result.disable_upnp = Some(flags.disable_upnp); result.disable_upnp = Some(flags.disable_upnp);
result.disable_relay_data = Some(flags.disable_relay_data); result.disable_relay_data = Some(flags.disable_relay_data);
result.enable_udp_broadcast_relay = Some(flags.enable_udp_broadcast_relay);
result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching); result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching);
result.enable_magic_dns = Some(flags.accept_dns); result.enable_magic_dns = Some(flags.accept_dns);
result.mtu = Some(flags.mtu as i32); result.mtu = Some(flags.mtu as i32);
@@ -1268,7 +1263,6 @@ mod tests {
flags.disable_tcp_hole_punching = rng.gen_bool(0.2); flags.disable_tcp_hole_punching = rng.gen_bool(0.2);
flags.disable_udp_hole_punching = rng.gen_bool(0.2); flags.disable_udp_hole_punching = rng.gen_bool(0.2);
flags.disable_upnp = rng.gen_bool(0.2); flags.disable_upnp = rng.gen_bool(0.2);
flags.enable_udp_broadcast_relay = rng.gen_bool(0.2);
flags.accept_dns = rng.gen_bool(0.6); flags.accept_dns = rng.gen_bool(0.6);
flags.mtu = rng.gen_range(1200..1500); flags.mtu = rng.gen_range(1200..1500);
flags.private_mode = rng.gen_bool(0.3); flags.private_mode = rng.gen_bool(0.3);
+15 -125
View File
@@ -6,15 +6,11 @@ in the future, with the help wo peer center we can forward packets of peers that
connected to any node in the local network. connected to any node in the local network.
*/ */
use std::{ use std::{
sync::{ sync::{Arc, Weak},
Arc, Weak,
atomic::{AtomicBool, Ordering},
},
time::SystemTime, time::SystemTime,
}; };
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use guarden::defer;
use tokio::{ use tokio::{
sync::{ sync::{
Mutex, Mutex,
@@ -97,7 +93,6 @@ struct ForeignNetworkEntry {
stats_mgr: Arc<StatsManager>, stats_mgr: Arc<StatsManager>,
traffic_metrics: Arc<TrafficMetricRecorder>, traffic_metrics: Arc<TrafficMetricRecorder>,
event_handler_started: AtomicBool,
tasks: Mutex<JoinSet<()>>, tasks: Mutex<JoinSet<()>>,
@@ -165,11 +160,10 @@ impl ForeignNetworkEntry {
InstanceLabelKind::From, InstanceLabelKind::From,
)), )),
{ {
let peer_map = Arc::downgrade(&peer_map); let peer_map = peer_map.clone();
move |peer_id| { move |peer_id| {
let peer_map = peer_map.clone(); let peer_map = peer_map.clone();
async move { async move {
let peer_map = peer_map.upgrade()?;
peer_map peer_map
.get_route_peer_info(peer_id) .get_route_peer_info(peer_id)
.await .await
@@ -236,7 +230,6 @@ impl ForeignNetworkEntry {
stats_mgr, stats_mgr,
traffic_metrics, traffic_metrics,
event_handler_started: AtomicBool::new(false),
tasks: Mutex::new(JoinSet::new()), tasks: Mutex::new(JoinSet::new()),
@@ -681,8 +674,6 @@ struct ForeignNetworkManagerData {
network_peer_last_update: DashMap<String, SystemTime>, network_peer_last_update: DashMap<String, SystemTime>,
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>, accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
lock: std::sync::Mutex<()>, lock: std::sync::Mutex<()>,
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool,
} }
impl ForeignNetworkManagerData { impl ForeignNetworkManagerData {
@@ -741,36 +732,6 @@ impl ForeignNetworkManagerData {
shrink_dashmap(&self.network_peer_last_update, None); shrink_dashmap(&self.network_peer_last_update, None);
} }
fn remove_network_if_current(
&self,
network_name: &String,
expected_entry: &Weak<ForeignNetworkEntry>,
) {
let _l = self.lock.lock().unwrap();
let Some(expected_entry) = expected_entry.upgrade() else {
return;
};
let old = self
.network_peer_maps
.remove_if(network_name, |_, entry| Arc::ptr_eq(entry, &expected_entry));
let Some((_, old)) = old else {
return;
};
old.traffic_metrics.clear_peer_cache();
let to_remove_peers = old.peer_map.list_peers();
for p in to_remove_peers {
self.peer_network_map.remove_if(&p, |_, v| {
v.remove(network_name);
v.is_empty()
});
}
self.network_peer_last_update.remove(network_name);
shrink_dashmap(&self.peer_network_map, None);
shrink_dashmap(&self.network_peer_maps, None);
shrink_dashmap(&self.network_peer_last_update, None);
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn get_or_insert_entry( async fn get_or_insert_entry(
&self, &self,
@@ -913,8 +874,6 @@ impl ForeignNetworkManager {
network_peer_last_update: DashMap::new(), network_peer_last_update: DashMap::new(),
accessor: Arc::new(accessor), accessor: Arc::new(accessor),
lock: std::sync::Mutex::new(()), lock: std::sync::Mutex::new(()),
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool::new(false),
}); });
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
@@ -932,13 +891,6 @@ impl ForeignNetworkManager {
} }
} }
#[cfg(test)]
fn fail_next_add_peer_conn_after_entry_insert(&self) {
self.data
.fail_next_add_peer_conn_after_entry_insert
.store(true, Ordering::Release);
}
pub fn get_network_peer_id(&self, network_name: &str) -> Option<PeerId> { pub fn get_network_peer_id(&self, network_name: &str) -> Option<PeerId> {
self.data self.data
.network_peer_maps .network_peer_maps
@@ -987,35 +939,6 @@ impl ForeignNetworkManager {
) )
.await; .await;
defer!(rollback_new_entry => sync [
data = self.data.clone(),
network_name = entry.network.network_name.clone(),
peer_id = peer_conn.get_peer_id(),
should_rollback = new_added
] {
if should_rollback {
tracing::warn!(
%network_name,
"rollback newly added foreign network entry after add_peer_conn returned error"
);
data.remove_peer(peer_id, &network_name);
}
});
#[cfg(test)]
if self
.data
.fail_next_add_peer_conn_after_entry_insert
.swap(false, Ordering::AcqRel)
{
return Err(anyhow::anyhow!(
"injected add_peer_conn failure after foreign network entry insert"
)
.into());
}
self.ensure_event_handler_started(&entry);
let same_identity = entry.network == peer_network; let same_identity = entry.network == peer_network;
let peer_identity_type = peer_conn.get_peer_identity_type(); let peer_identity_type = peer_conn.get_peer_identity_type();
let credential_peer_trusted = peer_digest_empty let credential_peer_trusted = peer_digest_empty
@@ -1029,6 +952,10 @@ impl ForeignNetworkManager {
|| credential_identity_mismatch || credential_identity_mismatch
|| entry.my_peer_id != peer_conn.get_my_peer_id() || entry.my_peer_id != peer_conn.get_my_peer_id()
{ {
if new_added {
self.data
.remove_peer(peer_conn.get_peer_id(), &entry.network.network_name.clone());
}
let err = if entry.my_peer_id != peer_conn.get_my_peer_id() { let err = if entry.my_peer_id != peer_conn.get_my_peer_id() {
anyhow::anyhow!( anyhow::anyhow!(
"my peer id not match. exp: {:?} real: {:?}, need retry connect", "my peer id not match. exp: {:?} real: {:?}, need retry connect",
@@ -1053,7 +980,9 @@ impl ForeignNetworkManager {
return Err(err.into()); return Err(err.into());
} }
if !new_added && let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) { if new_added {
self.start_event_handler(&entry).await;
} else if let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) {
let direct_conns_len = peer.get_directly_connections().len(); let direct_conns_len = peer.get_directly_connections().len();
let max_count = use_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK); let max_count = use_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK);
if direct_conns_len >= max_count as usize { if direct_conns_len >= max_count as usize {
@@ -1067,31 +996,23 @@ impl ForeignNetworkManager {
} }
entry.peer_map.add_new_peer_conn(peer_conn).await?; entry.peer_map.add_new_peer_conn(peer_conn).await?;
let _ = rollback_new_entry.defuse();
Ok(()) Ok(())
} }
fn ensure_event_handler_started(&self, entry: &Arc<ForeignNetworkEntry>) { async fn start_event_handler(&self, entry: &ForeignNetworkEntry) {
if entry.event_handler_started.swap(true, Ordering::AcqRel) {
return;
}
let data = self.data.clone(); let data = self.data.clone();
let network_name = entry.network.network_name.clone(); let network_name = entry.network.network_name.clone();
let entry_for_cleanup = Arc::downgrade(entry); let traffic_metrics = entry.traffic_metrics.clone();
let traffic_metrics = Arc::downgrade(&entry.traffic_metrics);
let mut s = entry.global_ctx.subscribe(); let mut s = entry.global_ctx.subscribe();
self.tasks.lock().unwrap().spawn(async move { self.tasks.lock().unwrap().spawn(async move {
while let Ok(e) = s.recv().await { while let Ok(e) = s.recv().await {
match &e { match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => { GlobalCtxEvent::PeerRemoved(peer_id) => {
tracing::info!(?e, "remove peer from foreign network manager"); tracing::info!(?e, "remove peer from foreign network manager");
if let Some(traffic_metrics) = traffic_metrics.upgrade() { traffic_metrics.remove_peer(*peer_id);
traffic_metrics.remove_peer(*peer_id); data.remove_peer(*peer_id, &network_name);
}
data.network_peer_last_update data.network_peer_last_update
.insert(network_name.clone(), SystemTime::now()); .insert(network_name.clone(), SystemTime::now());
data.remove_peer(*peer_id, &network_name);
} }
GlobalCtxEvent::PeerConnRemoved(..) => { GlobalCtxEvent::PeerConnRemoved(..) => {
tracing::info!(?e, "clear no conn peer from foreign network manager"); tracing::info!(?e, "clear no conn peer from foreign network manager");
@@ -1107,10 +1028,8 @@ impl ForeignNetworkManager {
} }
// if lagged or recv done just remove the network // if lagged or recv done just remove the network
tracing::error!("global event handler at foreign network manager exit"); tracing::error!("global event handler at foreign network manager exit");
if let Some(traffic_metrics) = traffic_metrics.upgrade() { traffic_metrics.clear_peer_cache();
traffic_metrics.clear_peer_cache(); data.remove_network(&network_name);
}
data.remove_network_if_current(&network_name, &entry_for_cleanup);
}); });
} }
@@ -1696,35 +1615,6 @@ pub mod tests {
.await; .await;
} }
#[tokio::test]
async fn failed_new_foreign_peer_conn_rolls_back_entry_maps() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let foreign_mgr = pm_center.get_foreign_network_manager();
foreign_mgr.fail_next_add_peer_conn_after_entry_insert();
let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair();
let (client_ret, server_ret) = tokio::time::timeout(Duration::from_secs(5), async {
tokio::join!(
pma_net1.add_client_tunnel(a_ring, false),
pm_center.add_tunnel_as_server(b_ring, true)
)
})
.await
.unwrap();
assert!(client_ret.is_ok());
assert!(server_ret.is_err());
assert!(foreign_mgr.data.get_network_entry("net1").is_none());
assert!(
foreign_mgr
.data
.get_peer_network(pma_net1.my_peer_id())
.is_none()
);
}
#[tokio::test] #[tokio::test]
async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() { async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
+12 -54
View File
@@ -636,27 +636,20 @@ impl PeerManager {
#[tracing::instrument] #[tracing::instrument]
pub async fn try_direct_connect_with_peer_id_hint<C>( pub async fn try_direct_connect_with_peer_id_hint<C>(
&self, &self,
connector: C, mut connector: C,
peer_id_hint: Option<PeerId>, peer_id_hint: Option<PeerId>,
) -> Result<(PeerId, PeerConnId), Error> ) -> Result<(PeerId, PeerConnId), Error>
where where
C: TunnelConnector + Debug, C: TunnelConnector + Debug,
{ {
let t = self.connect_tunnel(connector).await?; let ns = self.global_ctx.net_ns.clone();
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint)
.await .await
} }
pub(crate) async fn connect_tunnel<C>(&self, mut connector: C) -> Result<Box<dyn Tunnel>, Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
Ok(ns
.run_async(|| async move { connector.connect().await })
.await?)
}
// avoid loop back to virtual network // avoid loop back to virtual network
fn check_remote_addr_not_from_virtual_network( fn check_remote_addr_not_from_virtual_network(
&self, &self,
@@ -1569,26 +1562,17 @@ impl PeerManager {
ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address() ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address()
} }
fn select_ipv4_broadcast_peers<'a>(
routes: impl IntoIterator<Item = &'a instance::Route>,
my_peer_id: PeerId,
) -> Vec<PeerId> {
routes
.into_iter()
.filter_map(|route| {
(route.peer_id != my_peer_id && route.ipv4_addr.is_some()).then_some(route.peer_id)
})
.collect()
}
pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) { pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false; let mut is_exit_node = false;
let mut dst_peers = vec![]; let mut dst_peers = vec![];
if self.is_all_peers_broadcast_ipv4(ipv4_addr) { if self.is_all_peers_broadcast_ipv4(ipv4_addr) {
dst_peers.extend(Self::select_ipv4_broadcast_peers( dst_peers.extend(self.peers.list_routes().await.iter().filter_map(|x| {
&self.peers.list_route_infos().await, if *x.key() != self.my_peer_id {
self.my_peer_id, Some(*x.key())
)); } else {
None
}
}));
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(ipv4_addr).await { } else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(ipv4_addr).await {
dst_peers.push(peer_id); dst_peers.push(peer_id);
} else if !self } else if !self
@@ -2208,32 +2192,6 @@ mod tests {
assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2)); assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2));
} }
fn route_with_ipv4(
peer_id: u32,
ipv4_addr: Option<std::net::Ipv4Addr>,
) -> crate::proto::api::instance::Route {
crate::proto::api::instance::Route {
peer_id,
ipv4_addr: ipv4_addr.map(|addr| cidr::Ipv4Inet::new(addr, 24).unwrap().into()),
..Default::default()
}
}
#[test]
fn ipv4_broadcast_peer_selection_skips_peers_without_ipv4() {
let routes = vec![
route_with_ipv4(1, Some(std::net::Ipv4Addr::new(10, 126, 126, 1))),
route_with_ipv4(2, None),
route_with_ipv4(3, Some(std::net::Ipv4Addr::new(10, 126, 126, 3))),
route_with_ipv4(4, None),
];
assert_eq!(
PeerManager::select_ipv4_broadcast_peers(&routes, 3),
vec![1]
);
}
#[test] #[test]
fn gc_recent_traffic_removes_expired_and_connected_entries() { fn gc_recent_traffic_removes_expired_and_connected_entries() {
let stale_peer = 1; let stale_peer = 1;
-1
View File
@@ -100,7 +100,6 @@ message NetworkConfig {
optional bool ipv6_public_addr_auto = 63; optional bool ipv6_public_addr_auto = 63;
optional string ipv6_public_addr_prefix = 64; optional string ipv6_public_addr_prefix = 64;
optional bool disable_relay_data = 65; optional bool disable_relay_data = 65;
optional bool enable_udp_broadcast_relay = 66;
} }
message PortForwardConfig { message PortForwardConfig {
+1 -1
View File
@@ -76,7 +76,6 @@ message FlagsInConfig {
uint64 instance_recv_bps_limit = 39; uint64 instance_recv_bps_limit = 39;
bool disable_upnp = 40; bool disable_upnp = 40;
bool disable_relay_data = 41; bool disable_relay_data = 41;
bool enable_udp_broadcast_relay = 42;
} }
message RpcDescriptor { message RpcDescriptor {
@@ -106,6 +105,7 @@ enum CompressionAlgoPb {
Invalid = 0; Invalid = 0;
None = 1; None = 1;
Zstd = 2; Zstd = 2;
Lzo = 3;
} }
message RpcCompressionInfo { message RpcCompressionInfo {
+4
View File
@@ -467,6 +467,8 @@ impl TryFrom<CompressionAlgoPb> for CompressorAlgo {
match value { match value {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
CompressionAlgoPb::Zstd => Ok(CompressorAlgo::ZstdDefault), CompressionAlgoPb::Zstd => Ok(CompressorAlgo::ZstdDefault),
#[cfg(feature = "lzo")]
CompressionAlgoPb::Lzo => Ok(CompressorAlgo::Lzo),
CompressionAlgoPb::None => Ok(CompressorAlgo::None), CompressionAlgoPb::None => Ok(CompressorAlgo::None),
_ => Err(anyhow::anyhow!("Invalid CompressionAlgoPb")), _ => Err(anyhow::anyhow!("Invalid CompressionAlgoPb")),
} }
@@ -480,6 +482,8 @@ impl TryFrom<CompressorAlgo> for CompressionAlgoPb {
match value { match value {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
CompressorAlgo::ZstdDefault => Ok(CompressionAlgoPb::Zstd), CompressorAlgo::ZstdDefault => Ok(CompressionAlgoPb::Zstd),
#[cfg(feature = "lzo")]
CompressorAlgo::Lzo => Ok(CompressionAlgoPb::Lzo),
CompressorAlgo::None => Ok(CompressionAlgoPb::None), CompressorAlgo::None => Ok(CompressionAlgoPb::None),
} }
} }
-26
View File
@@ -115,12 +115,6 @@ impl<R> FramedReader<R> {
return Some(Err(TunnelError::InvalidPacket("body too long".to_string()))); return Some(Err(TunnelError::InvalidPacket("body too long".to_string())));
} }
if body_len < PEER_MANAGER_HEADER_SIZE {
return Some(Err(TunnelError::InvalidPacket(
"body too short".to_string(),
)));
}
if buf.len() < TCP_TUNNEL_HEADER_SIZE + body_len { if buf.len() < TCP_TUNNEL_HEADER_SIZE + body_len {
// body is not complete // body is not complete
return None; return None;
@@ -561,26 +555,6 @@ pub mod tests {
tunnel::{TunnelConnector, TunnelListener, packet_def::ZCPacket}, tunnel::{TunnelConnector, TunnelListener, packet_def::ZCPacket},
}; };
#[cfg(test)]
use crate::tunnel::{
TunnelError,
packet_def::{PEER_MANAGER_HEADER_SIZE, TCP_TUNNEL_HEADER_SIZE},
};
#[test]
fn framed_reader_rejects_short_peer_manager_body() {
let mut buf = BytesMut::new();
buf.put_u32_le((PEER_MANAGER_HEADER_SIZE - 1) as u32);
buf.resize(TCP_TUNNEL_HEADER_SIZE + PEER_MANAGER_HEADER_SIZE - 1, 0);
let ret = super::FramedReader::<tokio::io::Empty>::extract_one_packet(&mut buf, 2000);
assert!(matches!(
ret,
Some(Err(TunnelError::InvalidPacket(msg))) if msg == "body too short"
));
}
pub async fn _tunnel_echo_server(tunnel: Box<dyn super::Tunnel>, once: bool) { pub async fn _tunnel_echo_server(tunnel: Box<dyn super::Tunnel>, once: bool) {
let (mut recv, mut send) = tunnel.split(); let (mut recv, mut send) = tunnel.split();
+4
View File
@@ -309,6 +309,8 @@ pub enum CompressorAlgo {
None = 0, None = 0,
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
ZstdDefault = 1, ZstdDefault = 1,
#[cfg(feature = "lzo")]
Lzo = 2,
} }
#[repr(C, packed)] #[repr(C, packed)]
@@ -323,6 +325,8 @@ impl CompressorTail {
match self.algo { match self.algo {
#[cfg(feature = "zstd")] #[cfg(feature = "zstd")]
1 => Some(CompressorAlgo::ZstdDefault), 1 => Some(CompressorAlgo::ZstdDefault),
#[cfg(feature = "lzo")]
2 => Some(CompressorAlgo::Lzo),
_ => None, _ => None,
} }
} }
-7
View File
@@ -9,7 +9,6 @@ use crate::{
pub struct Controller { pub struct Controller {
token: String, token: String,
machine_id: uuid::Uuid,
hostname: String, hostname: String,
device_os: DeviceOsInfo, device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>, manager: Arc<NetworkInstanceManager>,
@@ -19,7 +18,6 @@ pub struct Controller {
impl Controller { impl Controller {
pub fn new( pub fn new(
token: String, token: String,
machine_id: uuid::Uuid,
hostname: String, hostname: String,
device_os: DeviceOsInfo, device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>, manager: Arc<NetworkInstanceManager>,
@@ -27,7 +25,6 @@ impl Controller {
) -> Self { ) -> Self {
Controller { Controller {
token, token,
machine_id,
hostname, hostname,
device_os, device_os,
manager, manager,
@@ -47,10 +44,6 @@ impl Controller {
self.hostname.clone() self.hostname.clone()
} }
pub fn machine_id(&self) -> uuid::Uuid {
self.machine_id
}
pub fn device_os(&self) -> DeviceOsInfo { pub fn device_os(&self) -> DeviceOsInfo {
self.device_os.clone() self.device_os.clone()
} }
+6 -19
View File
@@ -2,12 +2,11 @@ use std::sync::Arc;
use crate::{ use crate::{
common::{ common::{
MachineIdOptions,
config::TomlConfigLoader, config::TomlConfigLoader,
global_ctx::{ArcGlobalCtx, GlobalCtx}, global_ctx::{ArcGlobalCtx, GlobalCtx},
log, log,
os_info::collect_device_os_info, os_info::collect_device_os_info,
resolve_machine_id, set_default_machine_id,
stun::MockStunInfoCollector, stun::MockStunInfoCollector,
}, },
connector::create_connector_by_url, connector::create_connector_by_url,
@@ -82,7 +81,6 @@ impl WebClient {
pub fn new<T: TunnelConnector + 'static, S: ToString, H: ToString>( pub fn new<T: TunnelConnector + 'static, S: ToString, H: ToString>(
connector: T, connector: T,
token: S, token: S,
machine_id: Uuid,
hostname: H, hostname: H,
secure_mode: bool, secure_mode: bool,
manager: Arc<NetworkInstanceManager>, manager: Arc<NetworkInstanceManager>,
@@ -92,7 +90,6 @@ impl WebClient {
let hooks = hooks.unwrap_or_else(|| Arc::new(DefaultHooks)); let hooks = hooks.unwrap_or_else(|| Arc::new(DefaultHooks));
let controller = Arc::new(controller::Controller::new( let controller = Arc::new(controller::Controller::new(
token.to_string(), token.to_string(),
machine_id,
hostname.to_string(), hostname.to_string(),
collect_device_os_info(), collect_device_os_info(),
manager, manager,
@@ -232,14 +229,13 @@ impl WebClient {
pub async fn run_web_client( pub async fn run_web_client(
config_server_url_s: &str, config_server_url_s: &str,
machine_id_opts: MachineIdOptions, machine_id: Option<String>,
hostname: Option<String>, hostname: Option<String>,
secure_mode: bool, secure_mode: bool,
manager: Arc<NetworkInstanceManager>, manager: Arc<NetworkInstanceManager>,
hooks: Option<Arc<dyn WebClientHooks>>, hooks: Option<Arc<dyn WebClientHooks>>,
) -> Result<WebClient> { ) -> Result<WebClient> {
let machine_id = resolve_machine_id(&machine_id_opts) set_default_machine_id(machine_id);
.with_context(|| "failed to resolve machine id for web client")?;
let config_server_url = match Url::parse(config_server_url_s) { let config_server_url = match Url::parse(config_server_url_s) {
Ok(u) => u, Ok(u) => u,
Err(_) => format!( Err(_) => format!(
@@ -293,7 +289,6 @@ pub async fn run_web_client(
global_ctx, global_ctx,
}, },
token.to_string(), token.to_string(),
machine_id,
hostname, hostname,
secure_mode, secure_mode,
manager, manager,
@@ -305,18 +300,14 @@ pub async fn run_web_client(
mod tests { mod tests {
use std::sync::{Arc, atomic::AtomicBool}; use std::sync::{Arc, atomic::AtomicBool};
use crate::{common::MachineIdOptions, instance_manager::NetworkInstanceManager}; use crate::instance_manager::NetworkInstanceManager;
#[tokio::test] #[tokio::test]
async fn test_manager_wait() { async fn test_manager_wait() {
let manager = Arc::new(NetworkInstanceManager::new()); let manager = Arc::new(NetworkInstanceManager::new());
let temp_dir = tempfile::tempdir().unwrap();
let client = super::run_web_client( let client = super::run_web_client(
format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(), format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(),
MachineIdOptions { None,
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
},
None, None,
false, false,
manager.clone(), manager.clone(),
@@ -344,13 +335,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_run_web_client_with_unreachable_config_server() { async fn test_run_web_client_with_unreachable_config_server() {
let manager = Arc::new(NetworkInstanceManager::new()); let manager = Arc::new(NetworkInstanceManager::new());
let temp_dir = tempfile::tempdir().unwrap();
let client = super::run_web_client( let client = super::run_web_client(
"udp://config-server.invalid:22020/test", "udp://config-server.invalid:22020/test",
MachineIdOptions { None,
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
},
None, None,
false, false,
manager, manager,
+7 -48
View File
@@ -101,11 +101,7 @@ impl TunnelFilter for SecureDatagramTunnelFilter {
Err(e) => return Some(Err(e)), Err(e) => return Some(Err(e)),
}; };
let payload = match checked_payload(&packet, "secure packet") { let mut cipher = ZCPacket::new_with_payload(packet.payload());
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
let mut cipher = ZCPacket::new_with_payload(payload);
cipher.fill_peer_manager_hdr(0, 0, PacketType::Data as u8); cipher.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
cipher cipher
.mut_peer_manager_header() .mut_peer_manager_header()
@@ -120,27 +116,15 @@ impl TunnelFilter for SecureDatagramTunnelFilter {
)))); ))));
} }
let packet = ZCPacket::new_from_buf(cipher.payload_bytes(), ZCPacketType::DummyTunnel); Some(Ok(ZCPacket::new_from_buf(
if packet.peer_manager_header().is_none() { cipher.payload_bytes(),
return Some(Err(TunnelError::InvalidPacket( ZCPacketType::DummyTunnel,
"decrypted secure packet too short".to_string(), )))
)));
}
Some(Ok(packet))
} }
fn filter_output(&self) {} fn filter_output(&self) {}
} }
fn checked_payload<'a>(packet: &'a ZCPacket, context: &str) -> Result<&'a [u8], TunnelError> {
if packet.peer_manager_header().is_none() {
return Err(TunnelError::InvalidPacket(format!("{context} too short")));
}
Ok(packet.payload())
}
fn pack_control_packet(payload: &[u8]) -> ZCPacket { fn pack_control_packet(payload: &[u8]) -> ZCPacket {
let mut packet = ZCPacket::new_with_payload(payload); let mut packet = ZCPacket::new_with_payload(payload);
packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8); packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
@@ -230,8 +214,7 @@ pub async fn upgrade_client_tunnel(
Ok(None) => return Err(TunnelError::Shutdown), Ok(None) => return Err(TunnelError::Shutdown),
Err(error) => return Err(error.into()), Err(error) => return Err(error.into()),
}; };
let msg2_payload = checked_payload(&msg2_packet, "noise msg2 packet")?; let msg2_cipher = decode_noise_payload(msg2_packet.payload())
let msg2_cipher = decode_noise_payload(msg2_payload)
.ok_or_else(|| TunnelError::InvalidPacket("invalid noise msg2 magic".to_string()))?; .ok_or_else(|| TunnelError::InvalidPacket("invalid noise msg2 magic".to_string()))?;
let mut root_key_buf = [0u8; 32]; let mut root_key_buf = [0u8; 32];
let root_key_len = state let root_key_len = state
@@ -271,8 +254,7 @@ pub async fn accept_or_upgrade_server_tunnel(
)); ));
} }
}; };
let first_payload = checked_payload(&first_packet, "first packet")?; let Some(msg1_cipher) = decode_noise_payload(first_packet.payload()) else {
let Some(msg1_cipher) = decode_noise_payload(first_payload) else {
let stream = Box::pin(futures::stream::once(async move { Ok(first_packet) }).chain(stream)); let stream = Box::pin(futures::stream::once(async move { Ok(first_packet) }).chain(stream));
return Ok(( return Ok((
Box::new(RawSplitTunnel::new(info, stream, sink)) as Box<dyn Tunnel>, Box::new(RawSplitTunnel::new(info, stream, sink)) as Box<dyn Tunnel>,
@@ -321,7 +303,6 @@ pub async fn accept_or_upgrade_server_tunnel(
mod tests { mod tests {
use super::*; use super::*;
use crate::tunnel::ring::create_ring_tunnel_pair; use crate::tunnel::ring::create_ring_tunnel_pair;
use bytes::BytesMut;
#[test] #[test]
fn web_secure_cipher_algorithm_matches_support_flag() { fn web_secure_cipher_algorithm_matches_support_flag() {
@@ -357,28 +338,6 @@ mod tests {
assert!(matches!(err, TunnelError::Timeout(_))); assert!(matches!(err, TunnelError::Timeout(_)));
} }
#[tokio::test]
async fn accept_secure_tunnel_rejects_short_first_packet() {
let (server_tunnel, client_tunnel) = create_ring_tunnel_pair();
let server_task =
tokio::spawn(async move { accept_or_upgrade_server_tunnel(server_tunnel).await });
let (_stream, mut sink) = client_tunnel.split();
sink.send(ZCPacket::new_from_buf(
BytesMut::from(&b"\x01"[..]),
ZCPacketType::TCP,
))
.await
.unwrap();
let err = server_task.await.unwrap().unwrap_err();
assert!(matches!(
err,
TunnelError::InvalidPacket(msg) if msg == "first packet too short"
));
}
#[tokio::test] #[tokio::test]
async fn accept_secure_tunnel_after_short_client_delay() { async fn accept_secure_tunnel_after_short_client_delay() {
let (server_tunnel, client_tunnel) = create_ring_tunnel_pair(); let (server_tunnel, client_tunnel) = create_ring_tunnel_pair();
+5 -7
View File
@@ -7,7 +7,7 @@ use tokio::{
}; };
use crate::{ use crate::{
common::constants::EASYTIER_VERSION, common::{constants::EASYTIER_VERSION, get_machine_id},
proto::{ proto::{
rpc_impl::bidirect::BidirectRpcManager, rpc_impl::bidirect::BidirectRpcManager,
rpc_types::controller::BaseController, rpc_types::controller::BaseController,
@@ -65,13 +65,11 @@ impl Session {
tasks: &mut JoinSet<()>, tasks: &mut JoinSet<()>,
ctx: HeartbeatCtx, ctx: HeartbeatCtx,
) { ) {
let controller = controller.upgrade().unwrap(); let mid = get_machine_id();
let mid = controller.machine_id();
let inst_id = uuid::Uuid::new_v4(); let inst_id = uuid::Uuid::new_v4();
let token = controller.token(); let token = controller.upgrade().unwrap().token();
let hostname = controller.hostname(); let hostname = controller.upgrade().unwrap().hostname();
let device_os = controller.device_os(); let device_os = controller.upgrade().unwrap().device_os();
let controller = Arc::downgrade(&controller);
let ctx_clone = ctx.clone(); let ctx_clone = ctx.clone();
let mut tick = interval(std::time::Duration::from_secs(1)); let mut tick = interval(std::time::Duration::from_secs(1));