Compare commits

..

8 Commits

Author SHA1 Message Date
KKRainbow 8e1d079142 feat: add Windows UDP broadcast relay (#2222)
This may helps games to find rooms in virtual network.

- add opt-in Windows UDP broadcast relay config flag and CLI/env plumbing
- capture local UDP broadcasts with Windows raw sockets, normalize packets, and inject them via PeerManager
2026-05-09 09:56:31 +08:00
fanyang 55f15bb6f0 fix(connector): classify manual reconnect timeouts by stage (#2062) 2026-05-08 22:08:51 +08:00
Luna Yao 96fd39649a revert UPX version to 4.2.4 in core.yml (#2221) 2026-05-07 18:49:40 +08:00
KKRainbow 74fc8b300d chore: bump version to 2.6.4 (#2219) 2026-05-07 13:48:51 +08:00
KKRainbow baeee40b79 fix machine uid and easytier-web panic (#2215)
1. fix(web-client): persist and migrate machine id
2. fix panic when easytier-web session receive malformat packet
2026-05-07 00:57:42 +08:00
fanyang 4342c8d7a2 fix: add missing CLI help text (#2213) 2026-05-05 17:05:34 +08:00
KKRainbow 1178b312fa fix foreign network entry leak (#2211) 2026-05-05 11:01:44 +08:00
fanyang 362aa7a9cd fix: allow omitted ACL config fields (#2206) 2026-05-04 00:47:24 +08:00
44 changed files with 2439 additions and 203 deletions
+4 -1
View File
@@ -157,6 +157,9 @@ jobs:
- uses: mlugg/setup-zig@v2
if: ${{ contains(matrix.OS, 'ubuntu') }}
with:
version: 0.16.0
use-cache: true
- uses: taiki-e/install-action@v2
if: ${{ contains(matrix.OS, 'ubuntu') }}
@@ -227,7 +230,7 @@ jobs:
*) UPX_ARCH="amd64" ;;
esac
UPX_VERSION=5.1.1
UPX_VERSION=4.2.4
UPX_PKG="upx-${UPX_VERSION}-${UPX_ARCH}_linux"
curl -L "https://github.com/upx/upx/releases/download/v${UPX_VERSION}/${UPX_PKG}.tar.xz" -s | tar xJvf -
cp "${UPX_PKG}/upx" .
+1 -1
View File
@@ -11,7 +11,7 @@ on:
image_tag:
description: 'Tag for this image build'
type: string
default: 'v2.6.3'
default: 'v2.6.4'
required: true
mark_latest:
description: 'Mark this image as latest'
+1 -1
View File
@@ -18,7 +18,7 @@ on:
version:
description: 'Version for this release'
type: string
default: 'v2.6.3'
default: 'v2.6.4'
required: true
make_latest:
description: 'Mark this release as latest'
Generated
+3 -3
View File
@@ -2229,7 +2229,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
[[package]]
name = "easytier"
version = "2.6.3"
version = "2.6.4"
dependencies = [
"aes-gcm",
"anyhow",
@@ -2405,7 +2405,7 @@ dependencies = [
[[package]]
name = "easytier-gui"
version = "2.6.3"
version = "2.6.4"
dependencies = [
"anyhow",
"async-trait",
@@ -2486,7 +2486,7 @@ dependencies = [
[[package]]
name = "easytier-web"
version = "2.6.3"
version = "2.6.4"
dependencies = [
"anyhow",
"async-trait",
+1 -1
View File
@@ -1,6 +1,6 @@
id=easytier_magisk
name=EasyTier_Magisk
version=v2.6.3
version=v2.6.4
versionCode=1
author=EasyTier
description=easytier magisk module @EasyTier(https://github.com/EasyTier/EasyTier)
+1 -1
View File
@@ -1,7 +1,7 @@
{
"name": "easytier-gui",
"type": "module",
"version": "2.6.3",
"version": "2.6.4",
"private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": {
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "easytier-gui"
version = "2.6.3"
version = "2.6.4"
description = "EasyTier GUI"
authors = ["you"]
edition.workspace = true
+9 -1
View File
@@ -490,10 +490,18 @@ async fn init_web_client(app: AppHandle, url: Option<String>) -> Result<(), Stri
.ok_or_else(|| "Instance manager is not available".to_string())?;
let hooks = Arc::new(manager::GuiHooks { app: app.clone() });
let machine_id_state_dir = app
.path()
.app_data_dir()
.with_context(|| "Failed to resolve machine id state directory")
.map_err(|e| format!("{:#}", e))?;
let web_client = web_client::run_web_client(
url.as_str(),
None,
easytier::common::MachineIdOptions {
explicit_machine_id: None,
state_dir: Some(machine_id_state_dir),
},
None,
false,
instance_manager,
+1 -1
View File
@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false
},
"productName": "easytier-gui",
"version": "2.6.3",
"version": "2.6.4",
"identifier": "com.kkrainbow.easytier",
"plugins": {
"shell": {
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "easytier-web"
version = "2.6.3"
version = "2.6.4"
edition.workspace = true
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."
@@ -99,6 +99,7 @@ const bool_flags: BoolFlag[] = [
{ field: 'disable_encryption', help: 'disable_encryption_help' },
{ field: 'disable_tcp_hole_punching', help: 'disable_tcp_hole_punching_help' },
{ field: 'disable_udp_hole_punching', help: 'disable_udp_hole_punching_help' },
{ field: 'enable_udp_broadcast_relay', help: 'enable_udp_broadcast_relay_help' },
{ field: 'disable_upnp', help: 'disable_upnp_help' },
{ field: 'disable_sym_hole_punching', help: 'disable_sym_hole_punching_help' },
{ field: 'enable_magic_dns', help: 'enable_magic_dns_help' },
@@ -160,6 +160,9 @@ disable_tcp_hole_punching_help: 禁用TCP打洞功能
disable_udp_hole_punching: 禁用UDP打洞
disable_udp_hole_punching_help: 禁用UDP打洞功能
enable_udp_broadcast_relay: UDP 广播中继
enable_udp_broadcast_relay_help: "仅 Windows:捕获物理网卡上的本机 UDP 广播包并转发给 EasyTier 对等节点,帮助局域网游戏发现房间。需要管理员权限。"
disable_upnp: 禁用 UPnP
disable_upnp_help: 禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启。
@@ -260,6 +263,7 @@ event:
DhcpIpv4Conflicted: DHCP IPv4地址冲突
PortForwardAdded: 端口转发添加
ProxyCidrsUpdated: 子网代理CIDR更新
UdpBroadcastRelayStartResult: UDP广播中继启动结果
web:
login:
@@ -159,6 +159,9 @@ disable_tcp_hole_punching_help: Disable tcp hole punching
disable_udp_hole_punching: Disable UDP Hole Punching
disable_udp_hole_punching_help: Disable udp hole punching
enable_udp_broadcast_relay: UDP Broadcast Relay
enable_udp_broadcast_relay_help: "Windows only: capture local UDP broadcast packets from physical interfaces and forward them to EasyTier peers. Helps games to find rooms in local network. Requires administrator privileges."
disable_upnp: Disable UPnP
disable_upnp_help: Disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default.
@@ -260,6 +263,7 @@ event:
DhcpIpv4Conflicted: DhcpIpv4Conflicted
PortForwardAdded: PortForwardAdded
ProxyCidrsUpdated: ProxyCidrsUpdated
UdpBroadcastRelayStartResult: UDP Broadcast Relay Start Result
web:
login:
@@ -134,6 +134,7 @@ export interface NetworkConfig {
disable_tcp_hole_punching?: boolean
disable_udp_hole_punching?: boolean
disable_upnp?: boolean
enable_udp_broadcast_relay?: boolean
disable_sym_hole_punching?: boolean
enable_relay_network_whitelist?: boolean
@@ -211,6 +212,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
disable_tcp_hole_punching: false,
disable_udp_hole_punching: false,
disable_upnp: false,
enable_udp_broadcast_relay: false,
disable_sym_hole_punching: false,
enable_relay_network_whitelist: false,
relay_network_whitelist: [],
@@ -447,4 +449,6 @@ export enum EventType {
PortForwardAdded = 'PortForwardAdded', // PortForwardConfigPb
ProxyCidrsUpdated = 'ProxyCidrsUpdated', // string[], string[]
UdpBroadcastRelayStartResult = 'UdpBroadcastRelayStartResult', // { capture_backend?: string, error?: string }
}
+1
View File
@@ -365,6 +365,7 @@ mod tests {
let _c = WebClient::new(
connector,
"test",
uuid::Uuid::new_v4(),
"test",
false,
Arc::new(NetworkInstanceManager::new()),
+1 -1
View File
@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier"
version = "2.6.3"
version = "2.6.4"
edition.workspace = true
rust-version.workspace = true
authors = ["kkrainbow"]
+5
View File
@@ -191,6 +191,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.type_attribute("peer_rpc.RouteForeignNetworkSummary", "#[derive(Hash, Eq)]")
.type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]")
.type_attribute("acl.Acl", "#[serde(default)]")
.type_attribute("acl.AclV1", "#[serde(default)]")
.type_attribute("acl.Chain", "#[serde(default)]")
.type_attribute("acl.Rule", "#[serde(default)]")
.type_attribute("acl.GroupInfo", "#[serde(default)]")
.field_attribute(".api.manage.NetworkConfig", "#[serde(default)]")
.service_generator(Box::new(easytier_rpc_build::ServiceGenerator::default()))
.btree_map(["."])
+8 -2
View File
@@ -12,9 +12,9 @@ core_clap:
仅用户名:--config-server admin,将使用官方的服务器
machine_id:
en: |+
the machine id to identify this machine, used for config recovery after disconnection, must be unique and fixed. default is from system.
the machine id to identify this machine, used for config recovery after disconnection, must be unique and fixed. by default it is loaded from persisted local state; on first start it may be migrated from system information or generated, then remains fixed.
zh-CN: |+
Web 配置服务器通过 machine id 来识别机器,用于断线重连后的配置恢复,需要保证唯一且固定不变。默认从系统获得
Web 配置服务器通过 machine id 来识别机器,用于断线重连后的配置恢复,需要保证唯一且固定不变。默认从本地持久化状态读取;首次启动时可能基于系统信息迁移或生成,之后保持固定不变
config_file:
en: "path to the config file, NOTE: the options set by cmdline args will override options in config file"
zh-CN: "配置文件路径,注意:命令行中的配置的选项会覆盖配置文件中的选项"
@@ -184,6 +184,9 @@ core_clap:
disable_upnp:
en: "disable runtime UPnP/NAT-PMP port mapping for eligible listeners; automatic port mapping is enabled by default"
zh-CN: "禁用符合条件监听器的运行时 UPnP/NAT-PMP 端口映射;自动端口映射默认开启"
enable_udp_broadcast_relay:
en: "Windows only: capture local UDP broadcast packets from physical interfaces and forward them to EasyTier peers. Helps games to find rooms in local network. Requires administrator privileges."
zh-CN: "仅 Windows:捕获物理网卡上的本机 UDP 广播包并转发给 EasyTier 对等节点,帮助局域网游戏发现房间。需要管理员权限。"
relay_all_peer_rpc:
en: "relay all peer rpc packets, even if the peer is not in the relay network whitelist. this can help peers not in relay network whitelist to establish p2p connection."
zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。"
@@ -274,6 +277,9 @@ core_clap:
check_config:
en: Check config validity without starting the network
zh-CN: 检查配置文件的有效性并退出
daemon:
en: Run in daemon mode
zh-CN: 以守护进程模式运行
file_log_size_mb:
en: "per file log size in MB, default is 100MB"
zh-CN: "单个文件日志大小,单位 MB,默认值为 100MB"
+2 -3
View File
@@ -11,9 +11,8 @@ use windows::{
NET_FW_RULE_DIR_OUT,
},
Networking::WinSock::{
IP_UNICAST_IF, IPPROTO_IP, IPPROTO_IPV6, IPV6_UNICAST_IF, SIO_UDP_CONNRESET,
SO_EXCLUSIVEADDRUSE, SOCKET, SOCKET_ERROR, SOL_SOCKET, WSAGetLastError, WSAIoctl,
htonl, setsockopt,
IP_UNICAST_IF, IPPROTO_IP, IPPROTO_IPV6, IPV6_UNICAST_IF, SIO_UDP_CONNRESET, SOCKET,
SOCKET_ERROR, WSAGetLastError, WSAIoctl, htonl, setsockopt,
},
System::Com::{
CLSCTX_ALL, COINIT_MULTITHREADED, CoCreateInstance, CoInitializeEx, CoUninitialize,
+39
View File
@@ -1339,6 +1339,45 @@ mod tests {
assert_eq!(result.matched_rule, Some(RuleId::Priority(70)));
}
#[tokio::test]
async fn test_forward_acl_source_ip_whitelist() {
let mut acl_config = Acl::default();
let mut acl_v1 = AclV1::default();
let mut chain = Chain {
name: "subnet_proxy_protect".to_string(),
chain_type: ChainType::Forward as i32,
enabled: true,
default_action: Action::Drop as i32,
..Default::default()
};
chain.rules.push(Rule {
name: "allow_my_devices".to_string(),
priority: 1000,
enabled: true,
action: Action::Allow as i32,
protocol: Protocol::Any as i32,
source_ips: vec!["10.172.192.2/32".to_string()],
..Default::default()
});
acl_v1.chains.push(chain);
acl_config.acl_v1 = Some(acl_v1);
let processor = AclProcessor::new(acl_config);
let mut packet_info = create_test_packet_info();
packet_info.dst_ip = "192.168.1.10".parse().unwrap();
packet_info.src_ip = "10.172.192.2".parse().unwrap();
let result = processor.process_packet(&packet_info, ChainType::Forward);
assert_eq!(result.action, Action::Allow);
assert_eq!(result.matched_rule, Some(RuleId::Priority(1000)));
packet_info.src_ip = "10.172.192.3".parse().unwrap();
let result = processor.process_packet(&packet_info, ChainType::Forward);
assert_eq!(result.action, Action::Drop);
assert_eq!(result.matched_rule, Some(RuleId::Default));
}
fn create_test_acl_config() -> Acl {
let mut acl_config = Acl::default();
+66
View File
@@ -72,6 +72,7 @@ pub fn gen_default_flags() -> Flags {
instance_recv_bps_limit: u64::MAX,
disable_upnp: false,
disable_relay_data: false,
enable_udp_broadcast_relay: false,
}
}
@@ -1337,6 +1338,71 @@ stun_servers = [
assert!(err.to_string().contains("mapped listener port is missing"));
}
#[test]
fn test_acl_toml_rule_uses_defaults_for_omitted_fields() {
use crate::proto::acl::{Action, ChainType, Protocol};
let config_str = r#"
[[acl.acl_v1.chains]]
name = "subnet_proxy_protect"
chain_type = 3
enabled = true
default_action = 2
[[acl.acl_v1.chains.rules]]
name = "allow_my_devices"
priority = 1000
action = 1
source_ips = ["10.172.192.2/32"]
protocol = 5
enabled = true
"#;
let config = TomlConfigLoader::new_from_str(config_str).unwrap();
let acl = config.get_acl().unwrap();
let acl_v1 = acl.acl_v1.unwrap();
let chain = &acl_v1.chains[0];
let rule = &chain.rules[0];
assert_eq!(chain.chain_type, ChainType::Forward as i32);
assert_eq!(chain.default_action, Action::Drop as i32);
assert_eq!(rule.action, Action::Allow as i32);
assert_eq!(rule.protocol, Protocol::Any as i32);
assert_eq!(rule.source_ips, vec!["10.172.192.2/32"]);
assert!(rule.ports.is_empty());
assert!(rule.source_ports.is_empty());
assert!(rule.destination_ips.is_empty());
assert!(rule.source_groups.is_empty());
assert!(rule.destination_groups.is_empty());
assert_eq!(rule.rate_limit, 0);
assert_eq!(rule.burst_limit, 0);
assert!(!rule.stateful);
}
#[test]
fn test_acl_toml_group_can_omit_declares_or_members() {
let declares_only = r#"
[acl.acl_v1.group]
[[acl.acl_v1.group.declares]]
group_name = "admin"
group_secret = "admin-pw"
"#;
let config = TomlConfigLoader::new_from_str(declares_only).unwrap();
let group = config.get_acl().unwrap().acl_v1.unwrap().group.unwrap();
assert_eq!(group.declares.len(), 1);
assert!(group.members.is_empty());
let members_only = r#"
[acl.acl_v1.group]
members = ["admin"]
"#;
let config = TomlConfigLoader::new_from_str(members_only).unwrap();
let group = config.get_acl().unwrap().acl_v1.unwrap().group.unwrap();
assert!(group.declares.is_empty());
assert_eq!(group.members, vec!["admin"]);
}
#[test]
fn test_network_config_source_user_is_implicit() {
let config = TomlConfigLoader::default();
-2
View File
@@ -23,8 +23,6 @@ define_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, u64, 1000);
define_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, u64, 10);
define_global_var!(MACHINE_UID, Option<String>, None);
define_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK, u32, 3);
define_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER, bool, true);
+5
View File
@@ -77,6 +77,11 @@ pub enum GlobalCtxEvent {
ProxyCidrsUpdated(Vec<cidr::Ipv4Cidr>, Vec<cidr::Ipv4Cidr>), // (added, removed)
UdpBroadcastRelayStartResult {
capture_backend: Option<String>,
error: Option<String>,
},
CredentialChanged,
}
+596
View File
@@ -0,0 +1,596 @@
use std::{
env,
ffi::OsString,
io::Write as _,
path::{Path, PathBuf},
time::{Duration, Instant},
};
use anyhow::Context as _;
#[cfg(unix)]
use nix::{
errno::Errno,
fcntl::{Flock, FlockArg},
};
#[derive(Debug, Clone, Default)]
pub struct MachineIdOptions {
pub explicit_machine_id: Option<String>,
pub state_dir: Option<PathBuf>,
}
pub fn resolve_machine_id(opts: &MachineIdOptions) -> anyhow::Result<uuid::Uuid> {
if let Some(explicit_machine_id) = opts.explicit_machine_id.as_deref() {
return Ok(parse_or_hash_machine_id(explicit_machine_id));
}
let state_file = resolve_machine_id_state_file(opts.state_dir.as_deref())?;
let allow_legacy_machine_uid_migration =
should_attempt_legacy_machine_uid_migration(&state_file);
if let Some(machine_id) = read_state_machine_id(&state_file)? {
return Ok(machine_id);
}
if let Some(machine_id) = read_legacy_machine_id_file() {
return persist_machine_id(&state_file, machine_id);
}
if allow_legacy_machine_uid_migration
&& let Some(machine_id) = resolve_legacy_machine_uid_hash()
{
return persist_machine_id(&state_file, machine_id);
}
let machine_id = resolve_new_machine_id().unwrap_or_else(uuid::Uuid::new_v4);
persist_machine_id(&state_file, machine_id)
}
fn parse_or_hash_machine_id(raw: &str) -> uuid::Uuid {
if let Ok(mid) = uuid::Uuid::parse_str(raw.trim()) {
return mid;
}
digest_uuid_from_str(raw)
}
fn digest_uuid_from_str(raw: &str) -> uuid::Uuid {
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", raw, &mut b);
uuid::Uuid::from_bytes(b)
}
fn resolve_machine_id_state_file(state_dir: Option<&Path>) -> anyhow::Result<PathBuf> {
let state_dir = match state_dir {
Some(dir) => dir.to_path_buf(),
None => default_machine_id_state_dir()?,
};
Ok(state_dir.join("machine_id"))
}
fn non_empty_os_string(value: Option<OsString>) -> Option<OsString> {
value.filter(|value| !value.is_empty())
}
#[cfg(target_os = "linux")]
fn default_linux_machine_id_state_dir(
xdg_data_home: Option<OsString>,
home: Option<OsString>,
) -> PathBuf {
if let Some(path) = non_empty_os_string(xdg_data_home) {
return PathBuf::from(path).join("easytier");
}
if let Some(home) = non_empty_os_string(home) {
return PathBuf::from(home)
.join(".local")
.join("share")
.join("easytier");
}
PathBuf::from("/var/lib/easytier")
}
fn default_machine_id_state_dir() -> anyhow::Result<PathBuf> {
cfg_select! {
target_os = "linux" => Ok(default_linux_machine_id_state_dir(
env::var_os("XDG_DATA_HOME"),
env::var_os("HOME"),
)),
all(target_os = "macos", not(feature = "macos-ne")) => {
let home = non_empty_os_string(env::var_os("HOME"))
.ok_or_else(|| anyhow::anyhow!("HOME is not set, cannot resolve machine id state directory"))?;
Ok(PathBuf::from(home)
.join("Library")
.join("Application Support")
.join("com.easytier"))
},
target_os = "windows" => {
let local_app_data = non_empty_os_string(env::var_os("LOCALAPPDATA")).ok_or_else(|| {
anyhow::anyhow!("LOCALAPPDATA is not set, cannot resolve machine id state directory")
})?;
Ok(PathBuf::from(local_app_data).join("easytier"))
},
target_os = "freebsd" => {
let home = non_empty_os_string(env::var_os("HOME"))
.ok_or_else(|| anyhow::anyhow!("HOME is not set, cannot resolve machine id state directory"))?;
Ok(PathBuf::from(home).join(".local").join("share").join("easytier"))
},
target_os = "android" => {
anyhow::bail!("machine id state directory must be provided explicitly on Android");
},
_ => anyhow::bail!("machine id state directory is unsupported on this platform"),
}
}
fn read_state_machine_id(path: &Path) -> anyhow::Result<Option<uuid::Uuid>> {
let Some(contents) = read_optional_file(path)? else {
return Ok(None);
};
let machine_id = uuid::Uuid::parse_str(contents.trim())
.with_context(|| format!("invalid machine id in state file {}", path.display()))?;
Ok(Some(machine_id))
}
fn read_legacy_machine_id_file() -> Option<uuid::Uuid> {
let path = legacy_machine_id_file_path()?;
read_legacy_machine_id_file_at(&path)
}
fn read_legacy_machine_id_file_at(path: &Path) -> Option<uuid::Uuid> {
let contents = match std::fs::read_to_string(path) {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return None,
Err(err) => {
tracing::warn!(
path = %path.display(),
%err,
"ignoring unreadable legacy machine id file"
);
return None;
}
};
match uuid::Uuid::parse_str(contents.trim()) {
Ok(machine_id) => Some(machine_id),
Err(err) => {
tracing::warn!(
path = %path.display(),
%err,
"ignoring invalid legacy machine id file"
);
None
}
}
}
fn legacy_machine_id_file_path() -> Option<PathBuf> {
std::env::current_exe()
.ok()
.map(|path| path.with_file_name("et_machine_id"))
}
fn read_optional_file(path: &Path) -> anyhow::Result<Option<String>> {
match std::fs::read_to_string(path) {
Ok(contents) => Ok(Some(contents)),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err).with_context(|| format!("failed to read {}", path.display())),
}
}
fn should_attempt_legacy_machine_uid_migration(state_file: &Path) -> bool {
let Some(state_dir) = state_file.parent() else {
return false;
};
let Ok(mut entries) = std::fs::read_dir(state_dir) else {
return false;
};
entries.any(|entry| entry.is_ok())
}
fn resolve_legacy_machine_uid_hash() -> Option<uuid::Uuid> {
machine_uid_seed().map(|seed| digest_uuid_from_str(seed.as_str()))
}
fn resolve_new_machine_id() -> Option<uuid::Uuid> {
let seed = machine_uid_seed()?;
#[cfg(target_os = "linux")]
{
let seed = linux_machine_id_seed(&seed);
Some(digest_uuid_from_str(&seed))
}
#[cfg(not(target_os = "linux"))]
{
Some(digest_uuid_from_str(&seed))
}
}
#[cfg(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
))]
fn machine_uid_seed() -> Option<String> {
machine_uid::get()
.ok()
.filter(|value| !value.trim().is_empty())
}
#[cfg(not(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
)))]
fn machine_uid_seed() -> Option<String> {
None
}
#[cfg(target_os = "linux")]
fn linux_machine_id_seed(machine_uid: &str) -> String {
let mut seed = format!("machine_uid={machine_uid}");
let hostname = gethostname::gethostname()
.to_string_lossy()
.trim()
.to_string();
if !hostname.is_empty() {
seed.push_str("\nhostname=");
seed.push_str(&hostname);
}
let mac_addresses = collect_linux_mac_addresses();
if !mac_addresses.is_empty() {
seed.push_str("\nmacs=");
seed.push_str(&mac_addresses.join(","));
}
seed
}
#[cfg(target_os = "linux")]
fn collect_linux_mac_addresses() -> Vec<String> {
let mut macs = Vec::new();
let Ok(entries) = std::fs::read_dir("/sys/class/net") else {
return macs;
};
for entry in entries.flatten() {
let Ok(name) = entry.file_name().into_string() else {
continue;
};
if name == "lo" {
continue;
}
let address_path = entry.path().join("address");
let Ok(address) = std::fs::read_to_string(address_path) else {
continue;
};
let address = address.trim().to_ascii_lowercase();
if address.is_empty() || address == "00:00:00:00:00:00" {
continue;
}
macs.push(address);
}
macs.sort();
macs.dedup();
macs.truncate(3);
macs
}
fn persist_machine_id(path: &Path, machine_id: uuid::Uuid) -> anyhow::Result<uuid::Uuid> {
if let Some(existing) = read_state_machine_id(path)? {
return Ok(existing);
}
let _lock = MachineIdWriteLock::acquire(path)?;
if let Some(existing) = read_state_machine_id(path)? {
return Ok(existing);
}
write_uuid_file_atomically(path, machine_id)?;
Ok(machine_id)
}
fn write_uuid_file_atomically(path: &Path, machine_id: uuid::Uuid) -> anyhow::Result<()> {
let parent = path.parent().ok_or_else(|| {
anyhow::anyhow!(
"machine id state file {} has no parent directory",
path.display()
)
})?;
std::fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create machine id state directory {}",
parent.display()
)
})?;
let tmp_path = parent.join(format!(
".machine_id.tmp-{}-{}",
std::process::id(),
uuid::Uuid::new_v4()
));
{
let mut file = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.with_context(|| format!("failed to create {}", tmp_path.display()))?;
file.write_all(machine_id.to_string().as_bytes())
.with_context(|| format!("failed to write {}", tmp_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to flush {}", tmp_path.display()))?;
}
if let Err(err) = std::fs::rename(&tmp_path, path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err).with_context(|| {
format!(
"failed to move machine id state file into place at {}",
path.display()
)
});
}
Ok(())
}
struct MachineIdWriteLock {
#[cfg(unix)]
_lock: Flock<std::fs::File>,
#[cfg(not(unix))]
path: PathBuf,
}
impl MachineIdWriteLock {
fn acquire(path: &Path) -> anyhow::Result<Self> {
let parent = path.parent().ok_or_else(|| {
anyhow::anyhow!(
"machine id state file {} has no parent directory",
path.display()
)
})?;
std::fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create machine id state directory {}",
parent.display()
)
})?;
#[cfg(unix)]
{
Self::acquire_unix(path)
}
#[cfg(not(unix))]
{
Self::acquire_fallback(path)
}
}
#[cfg(unix)]
fn acquire_unix(path: &Path) -> anyhow::Result<Self> {
let lock_path = path.with_extension("lock");
let deadline = Instant::now() + Duration::from_secs(5);
let mut lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.with_context(|| format!("failed to open machine id lock {}", lock_path.display()))?;
loop {
match Flock::lock(lock_file, FlockArg::LockExclusiveNonblock) {
Ok(lock) => return Ok(Self { _lock: lock }),
Err((file, Errno::EAGAIN)) => {
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for machine id lock {}",
lock_path.display()
);
}
lock_file = file;
std::thread::sleep(Duration::from_millis(50));
}
Err((_file, err)) => {
anyhow::bail!(
"failed to acquire machine id lock {}: {}",
lock_path.display(),
err
);
}
}
}
}
#[cfg(not(unix))]
fn acquire_fallback(path: &Path) -> anyhow::Result<Self> {
let lock_path = path.with_extension("lock");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&lock_path)
{
Ok(mut file) => {
writeln!(file, "pid={}", std::process::id()).ok();
return Ok(Self { path: lock_path });
}
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
if should_reap_stale_lock_file(&lock_path) {
let _ = std::fs::remove_file(&lock_path);
continue;
}
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for machine id lock {}",
lock_path.display()
);
}
std::thread::sleep(Duration::from_millis(50));
}
Err(err) => {
return Err(err).with_context(|| {
format!("failed to acquire machine id lock {}", lock_path.display())
});
}
}
}
}
}
#[cfg(not(unix))]
fn should_reap_stale_lock_file(lock_path: &Path) -> bool {
const STALE_LOCK_AGE: Duration = Duration::from_secs(30);
let Ok(metadata) = std::fs::metadata(lock_path) else {
return false;
};
let Ok(modified) = metadata.modified() else {
return false;
};
modified
.elapsed()
.is_ok_and(|elapsed| elapsed >= STALE_LOCK_AGE)
}
impl Drop for MachineIdWriteLock {
fn drop(&mut self) {
#[cfg(not(unix))]
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolve_machine_id_uses_uuid_seed_verbatim() {
let raw = "33333333-3333-3333-3333-333333333333".to_string();
let opts = MachineIdOptions {
explicit_machine_id: Some(raw.clone()),
state_dir: None,
};
assert_eq!(
resolve_machine_id(&opts).unwrap(),
uuid::Uuid::parse_str(&raw).unwrap()
);
}
#[test]
fn test_resolve_machine_id_reads_state_file() {
let temp_dir = tempfile::tempdir().unwrap();
let expected = uuid::Uuid::new_v4();
std::fs::write(temp_dir.path().join("machine_id"), expected.to_string()).unwrap();
let opts = MachineIdOptions {
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
};
assert_eq!(resolve_machine_id(&opts).unwrap(), expected);
}
#[test]
fn test_read_legacy_machine_id_file_ignores_read_errors() {
let temp_dir = tempfile::tempdir().unwrap();
assert_eq!(read_legacy_machine_id_file_at(temp_dir.path()), None);
}
#[test]
fn test_write_uuid_file_atomically_writes_expected_contents() {
let temp_dir = tempfile::tempdir().unwrap();
let machine_id = uuid::Uuid::new_v4();
let state_file = temp_dir.path().join("machine_id");
write_uuid_file_atomically(&state_file, machine_id).unwrap();
assert_eq!(
std::fs::read_to_string(state_file).unwrap(),
machine_id.to_string()
);
}
#[test]
fn test_non_empty_os_string_filters_empty_values() {
assert_eq!(non_empty_os_string(Some(OsString::new())), None);
assert_eq!(
non_empty_os_string(Some(OsString::from("foo"))),
Some(OsString::from("foo"))
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_default_linux_machine_id_state_dir_falls_back_in_order() {
assert_eq!(
default_linux_machine_id_state_dir(
Some(OsString::from("/tmp/xdg")),
Some(OsString::from("/tmp/home"))
),
PathBuf::from("/tmp/xdg").join("easytier")
);
assert_eq!(
default_linux_machine_id_state_dir(
Some(OsString::new()),
Some(OsString::from("/tmp/home"))
),
PathBuf::from("/tmp/home")
.join(".local")
.join("share")
.join("easytier")
);
assert_eq!(
default_linux_machine_id_state_dir(Some(OsString::new()), Some(OsString::new())),
PathBuf::from("/var/lib/easytier")
);
}
#[test]
fn test_persist_machine_id_creates_missing_state_dir() {
let temp_dir = tempfile::tempdir().unwrap();
let state_file = temp_dir.path().join("nested").join("machine_id");
let machine_id = uuid::Uuid::new_v4();
assert_eq!(
persist_machine_id(&state_file, machine_id).unwrap(),
machine_id
);
assert_eq!(
std::fs::read_to_string(state_file).unwrap(),
machine_id.to_string()
);
}
#[test]
fn test_legacy_machine_uid_migration_requires_existing_state_dir_content() {
let temp_dir = tempfile::tempdir().unwrap();
let missing_state_file = temp_dir.path().join("missing").join("machine_id");
assert!(!should_attempt_legacy_machine_uid_migration(
&missing_state_file
));
let empty_dir = temp_dir.path().join("empty");
std::fs::create_dir_all(&empty_dir).unwrap();
assert!(!should_attempt_legacy_machine_uid_migration(
&empty_dir.join("machine_id")
));
std::fs::write(empty_dir.join("config.toml"), "x=1").unwrap();
assert!(should_attempt_legacy_machine_uid_migration(
&empty_dir.join("machine_id")
));
}
}
+3 -76
View File
@@ -1,15 +1,12 @@
use std::{
fmt::Debug,
future,
io::Write as _,
sync::{Arc, Mutex},
};
use time::util::refresh_tz;
use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument;
use crate::{set_global_var, use_global_var};
pub mod acl_processor;
pub mod compressor;
pub mod config;
@@ -21,6 +18,7 @@ pub mod global_ctx;
pub mod idn;
pub mod ifcfg;
pub mod log;
pub mod machine_id;
pub mod netns;
pub mod network;
pub mod os_info;
@@ -31,6 +29,8 @@ pub mod token_bucket;
pub mod tracing_rolling_appender;
pub mod upnp;
pub use machine_id::{MachineIdOptions, resolve_machine_id};
pub fn get_logger_timer<F: time::formatting::Formattable>(
format: F,
) -> tracing_subscriber::fmt::time::OffsetTime<F> {
@@ -96,71 +96,6 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
);
}
pub fn set_default_machine_id(mid: Option<String>) {
set_global_var!(MACHINE_UID, mid);
}
pub fn get_machine_id() -> uuid::Uuid {
if let Some(default_mid) = use_global_var!(MACHINE_UID) {
if let Ok(mid) = uuid::Uuid::parse_str(default_mid.trim()) {
return mid;
}
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", &default_mid, &mut b);
return uuid::Uuid::from_bytes(b);
}
// a path same as the binary
let machine_id_file = std::env::current_exe()
.map(|x| x.with_file_name("et_machine_id"))
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
// try load from local file
if let Ok(mid) = std::fs::read_to_string(&machine_id_file)
&& let Ok(mid) = uuid::Uuid::parse_str(mid.trim())
{
return mid;
}
#[cfg(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
))]
let gen_mid = machine_uid::get()
.map(|x| {
if x.is_empty() {
return uuid::Uuid::new_v4();
}
let mut b = [0u8; 16];
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
uuid::Uuid::from_bytes(b)
})
.ok();
#[cfg(not(any(
target_os = "linux",
all(target_os = "macos", not(feature = "macos-ne")),
target_os = "windows",
target_os = "freebsd"
)))]
let gen_mid = None;
if let Some(mid) = gen_mid {
return mid;
}
let gen_mid = uuid::Uuid::new_v4();
// try save to local file
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
let _ = file.write_all(gen_mid.to_string().as_bytes());
}
gen_mid
}
pub fn shrink_dashmap<K: Eq + std::hash::Hash, V>(
map: &dashmap::DashMap<K, V>,
threshold: Option<usize>,
@@ -210,12 +145,4 @@ mod tests {
assert_eq!(weak_js.weak_count(), 0);
assert_eq!(weak_js.strong_count(), 0);
}
#[test]
fn test_get_machine_id_uses_uuid_seed_verbatim() {
let raw = "33333333-3333-3333-3333-333333333333".to_string();
set_default_machine_id(Some(raw.clone()));
assert_eq!(get_machine_id(), uuid::Uuid::parse_str(&raw).unwrap());
set_default_machine_id(None);
}
}
+22
View File
@@ -85,6 +85,15 @@ pub enum MetricName {
/// Traffic packets forwarded for foreign network, forward
TrafficPacketsForeignForwardForwarded,
/// UDP broadcast relay packets captured from the raw socket
UdpBroadcastRelayPacketsCaptured,
/// UDP broadcast relay packets ignored before forwarding
UdpBroadcastRelayPacketsIgnored,
/// UDP broadcast relay packets forwarded
UdpBroadcastRelayPacketsForwarded,
/// UDP broadcast relay packets that failed to forward
UdpBroadcastRelayPacketsForwardFailed,
/// Compression bytes before compression
CompressionBytesRxBefore,
/// Compression bytes after compression
@@ -167,6 +176,19 @@ impl fmt::Display for MetricName {
write!(f, "traffic_packets_foreign_forward_forwarded")
}
MetricName::UdpBroadcastRelayPacketsCaptured => {
write!(f, "udp_broadcast_relay_packets_captured")
}
MetricName::UdpBroadcastRelayPacketsIgnored => {
write!(f, "udp_broadcast_relay_packets_ignored")
}
MetricName::UdpBroadcastRelayPacketsForwarded => {
write!(f, "udp_broadcast_relay_packets_forwarded")
}
MetricName::UdpBroadcastRelayPacketsForwardFailed => {
write!(f, "udp_broadcast_relay_packets_forward_failed")
}
MetricName::CompressionBytesRxBefore => write!(f, "compression_bytes_rx_before"),
MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"),
MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"),
+162 -52
View File
@@ -1,6 +1,8 @@
use std::{
collections::BTreeSet,
future::Future,
sync::{Arc, Weak},
time::{Duration, Instant},
};
use dashmap::DashSet;
@@ -16,7 +18,7 @@ use crate::{
},
rpc_types::{self, controller::BaseController},
},
tunnel::{IpVersion, TunnelConnector},
tunnel::{IpVersion, TunnelConnector, TunnelScheme, matches_scheme},
utils::weak_upgrade,
};
@@ -83,6 +85,55 @@ impl ManualConnectorManager {
ret
}
fn reconnect_timeout(dead_url: &url::Url) -> Duration {
let use_long_timeout = matches_scheme!(
dead_url,
TunnelScheme::Http | TunnelScheme::Https | TunnelScheme::Txt | TunnelScheme::Srv
) || matches!(dead_url.scheme(), "ws" | "wss");
Duration::from_secs(if use_long_timeout { 20 } else { 2 })
}
fn remaining_budget(started_at: Instant, total_timeout: Duration) -> Option<Duration> {
let remaining = total_timeout.checked_sub(started_at.elapsed())?;
(!remaining.is_zero()).then_some(remaining)
}
fn emit_connect_error(
data: &ConnectorManagerData,
dead_url: &url::Url,
ip_version: IpVersion,
error: &Error,
) {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:#?}", error),
));
}
fn reconnect_timeout_error(stage: &str, duration: Duration) -> Error {
Error::AnyhowError(anyhow::anyhow!("{} timeout after {:?}", stage, duration))
}
async fn with_reconnect_timeout<T, F>(
stage: &'static str,
started_at: Instant,
total_timeout: Duration,
fut: F,
) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
let remaining = Self::remaining_budget(started_at, total_timeout)
.ok_or_else(|| Self::reconnect_timeout_error(stage, started_at.elapsed()))?;
timeout(remaining, fut)
.await
.map_err(|_| Self::reconnect_timeout_error(stage, remaining))?
}
}
impl ManualConnectorManager {
pub fn add_connector<T>(&self, connector: T)
where
T: TunnelConnector + 'static,
@@ -242,11 +293,18 @@ impl ManualConnectorManager {
async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>,
dead_url: String,
dead_url: url::Url,
ip_version: IpVersion,
started_at: Instant,
total_timeout: Duration,
) -> Result<ReconnResult, Error> {
let connector =
create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?;
let connector = Self::with_reconnect_timeout(
"resolve",
started_at,
total_timeout,
create_connector_by_url(dead_url.as_str(), &data.global_ctx, ip_version),
)
.await?;
data.global_ctx
.issue_event(GlobalCtxEvent::Connecting(connector.remote_url()));
@@ -257,10 +315,25 @@ impl ManualConnectorManager {
)));
};
let (peer_id, conn_id) = pm.try_direct_connect(connector).await?;
let tunnel = Self::with_reconnect_timeout(
"connect",
started_at,
total_timeout,
pm.connect_tunnel(connector),
)
.await?;
let (peer_id, conn_id) = Self::with_reconnect_timeout(
"handshake",
started_at,
total_timeout,
pm.add_client_tunnel_with_peer_id_hint(tunnel, true, None),
)
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
dead_url: dead_url.to_string(),
peer_id,
conn_id,
})
@@ -273,22 +346,33 @@ impl ManualConnectorManager {
tracing::info!("reconnect: {}", dead_url);
let mut ip_versions = vec![];
if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" {
if matches_scheme!(
dead_url,
TunnelScheme::Ring | TunnelScheme::Txt | TunnelScheme::Srv
) {
ip_versions.push(IpVersion::Both);
} else {
let converted_dead_url = crate::common::idn::convert_idn_to_ascii(dead_url.clone())?;
let addrs = match socket_addrs(&converted_dead_url, || Some(1000)).await {
let converted_dead_url =
match crate::common::idn::convert_idn_to_ascii(dead_url.clone()) {
Ok(url) => url,
Err(error) => {
let error: Error = error.into();
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
let addrs = match Self::with_reconnect_timeout(
"resolve",
Instant::now(),
Self::reconnect_timeout(&dead_url),
socket_addrs(&converted_dead_url, || Some(1000)),
)
.await
{
Ok(addrs) => addrs,
Err(e) => {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", IpVersion::Both),
format!("{:?}", e),
));
return Err(Error::AnyhowError(anyhow::anyhow!(
"get ip from url failed: {:?}",
e
)));
Err(error) => {
Self::emit_connect_error(&data, &dead_url, IpVersion::Both, &error);
return Err(error);
}
};
tracing::info!(?addrs, ?dead_url, "get ip from url done");
@@ -313,46 +397,24 @@ impl ManualConnectorManager {
"cannot get ip from url"
)));
for ip_version in ip_versions {
let use_long_timeout = dead_url.scheme() == "http"
|| dead_url.scheme() == "https"
|| dead_url.scheme() == "ws"
|| dead_url.scheme() == "wss"
|| dead_url.scheme() == "txt"
|| dead_url.scheme() == "srv";
let ret = timeout(
// allow http/websocket connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.to_string(),
ip_version,
),
let started_at = Instant::now();
let ret = Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),
ip_version,
started_at,
Self::reconnect_timeout(&dead_url),
)
.await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
match ret {
Ok(Ok(_)) => {
// 外层和内层都成功:解包并跳出
reconn_ret = ret.unwrap();
break;
}
Ok(Err(e)) => {
// 外层成功,内层失败
reconn_ret = Err(e);
}
Err(e) => {
// 外层失败
reconn_ret = Err(e.into());
Ok(result) => return Ok(result),
Err(error) => {
Self::emit_connect_error(&data, &dead_url, ip_version, &error);
reconn_ret = Err(error);
}
}
// 发送事件(只有在未 break 时才执行)
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.to_string(),
format!("{:?}", ip_version),
format!("{:?}", reconn_ret),
));
}
reconn_ret
@@ -388,6 +450,54 @@ mod tests {
use super::*;
#[tokio::test]
async fn reconnect_timeout_reports_exhausted_budget_for_stage() {
let started_at = Instant::now() - Duration::from_millis(50);
let err = ManualConnectorManager::with_reconnect_timeout(
"resolve",
started_at,
Duration::from_millis(1),
async { Ok::<(), Error>(()) },
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("resolve timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_reports_stage_timeout_with_remaining_budget() {
let err = ManualConnectorManager::with_reconnect_timeout(
"handshake",
Instant::now(),
Duration::from_millis(10),
async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<(), Error>(())
},
)
.await
.unwrap_err();
let message = err.to_string();
assert!(message.contains("handshake timeout after"));
}
#[tokio::test]
async fn reconnect_timeout_preserves_success_within_budget() {
let result = ManualConnectorManager::with_reconnect_timeout(
"connect",
Instant::now(),
Duration::from_millis(50),
async { Ok::<_, Error>(123_u32) },
)
.await
.unwrap();
assert_eq!(result, 123);
}
#[tokio::test]
async fn test_reconnect_with_connecting_addr() {
set_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, 1);
+16 -1
View File
@@ -484,6 +484,15 @@ struct NetworkOptions {
)]
disable_upnp: Option<bool>,
#[arg(
long,
env = "ET_ENABLE_UDP_BROADCAST_RELAY",
help = t!("core_clap.enable_udp_broadcast_relay").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
enable_udp_broadcast_relay: Option<bool>,
#[arg(
long,
env = "ET_RELAY_ALL_PEER_RPC",
@@ -1142,6 +1151,9 @@ impl NetworkOptions {
.disable_sym_hole_punching
.unwrap_or(f.disable_sym_hole_punching);
f.disable_upnp = self.disable_upnp.unwrap_or(f.disable_upnp);
f.enable_udp_broadcast_relay = self
.enable_udp_broadcast_relay
.unwrap_or(f.enable_udp_broadcast_relay);
// Configure tld_dns_zone: use provided value if set
if let Some(tld_dns_zone) = &self.tld_dns_zone {
f.tld_dns_zone = tld_dns_zone.clone();
@@ -1336,7 +1348,10 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
let _web_client = if let Some(config_server_url_s) = cli.config_server.as_ref() {
let wc = web_client::run_web_client(
config_server_url_s,
cli.machine_id.clone(),
crate::common::MachineIdOptions {
explicit_machine_id: cli.machine_id.clone(),
state_dir: None,
},
cli.network_options.hostname.clone(),
cli.network_options.secure_mode.unwrap_or(false),
manager.clone(),
+20 -6
View File
@@ -193,8 +193,11 @@ struct PeerArgs {
#[derive(Subcommand, Debug)]
enum PeerSubCommand {
/// List connected peers
List,
/// Show public IPv6 address information
Ipv6,
/// List foreign networks discovered by this instance
ListForeign {
#[arg(
long,
@@ -203,6 +206,7 @@ enum PeerSubCommand {
)]
trusted_keys: bool,
},
/// List global foreign networks from the peer center
ListGlobalForeign,
}
@@ -214,16 +218,18 @@ struct RouteArgs {
#[derive(Subcommand, Debug)]
enum RouteSubCommand {
/// List routes propagated by peers
List,
/// Dump routes in CIDR format
Dump,
}
#[derive(Args, Debug)]
struct ConnectorArgs {
#[arg(short, long)]
#[arg(short, long, help = "filter connectors by virtual IPv4 address")]
ipv4: Option<String>,
#[arg(short, long)]
#[arg(short, long, help = "filter connectors by peer URL")]
peers: Vec<String>,
#[command(subcommand)]
@@ -242,6 +248,7 @@ enum ConnectorSubCommand {
#[arg(help = "connector url, e.g., tcp://1.2.3.4:11010")]
url: String,
},
/// List connectors
List,
}
@@ -283,6 +290,7 @@ struct AclArgs {
#[derive(Subcommand, Debug)]
enum AclSubCommand {
/// Show ACL rule hit statistics
Stats,
}
@@ -450,19 +458,25 @@ struct InstallArgs {
#[arg(long, default_value = env!("CARGO_PKG_DESCRIPTION"), help = "service description")]
description: String,
#[arg(long)]
#[arg(long, help = "display name shown by the service manager")]
display_name: Option<String>,
#[arg(long)]
#[arg(
long,
help = "whether to disable starting the service automatically on boot (true/false)"
)]
disable_autostart: Option<bool>,
#[arg(long)]
#[arg(
long,
help = "whether to disable automatic restart when the service fails (true/false)"
)]
disable_restart_on_failure: Option<bool>,
#[arg(long, help = "path to easytier-core binary")]
core_path: Option<PathBuf>,
#[arg(long)]
#[arg(long, help = "working directory for the easytier-core service")]
service_work_dir: Option<PathBuf>,
#[arg(
+3
View File
@@ -10,3 +10,6 @@ pub mod proxy_cidrs_monitor;
#[cfg(feature = "tun")]
pub mod virtual_nic;
#[cfg(any(windows, test))]
pub(crate) mod windows_udp_broadcast;
@@ -1,5 +1,8 @@
use std::{path::Path, sync::Arc};
#[cfg(target_os = "linux")]
use std::path::Path;
use std::sync::Arc;
#[cfg(target_os = "linux")]
use anyhow::Context;
use cidr::{Ipv6Cidr, Ipv6Inet};
#[cfg(target_os = "linux")]
@@ -321,7 +324,7 @@ async fn resolve_public_ipv6_provider_runtime_state_linux(
}
async fn resolve_public_ipv6_provider_runtime_state(
global_ctx: &ArcGlobalCtx,
_global_ctx: &ArcGlobalCtx,
config: PublicIpv6ProviderConfigSnapshot,
) -> PublicIpv6ProviderRuntimeState {
if !config.provider_enabled {
@@ -331,7 +334,7 @@ async fn resolve_public_ipv6_provider_runtime_state(
#[cfg(target_os = "linux")]
{
return resolve_public_ipv6_provider_runtime_state_linux(
global_ctx,
_global_ctx,
config.configured_prefix,
)
.await;
+35
View File
@@ -35,6 +35,8 @@ use tokio::{
task::JoinSet,
};
use tokio_util::bytes::Bytes;
#[cfg(target_os = "windows")]
use tokio_util::task::AbortOnDropHandle;
use tun::{AbstractDevice, AsyncDevice, Configuration, Layer};
use zerocopy::{NativeEndian, NetworkEndian};
@@ -801,6 +803,9 @@ pub struct NicCtx {
nic: Arc<Mutex<VirtualNic>>,
tasks: JoinSet<()>,
#[cfg(target_os = "windows")]
windows_udp_broadcast_relay: Option<AbortOnDropHandle<()>>,
}
impl NicCtx {
@@ -819,6 +824,9 @@ impl NicCtx {
nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))),
tasks: JoinSet::new(),
#[cfg(target_os = "windows")]
windows_udp_broadcast_relay: None,
}
}
@@ -1005,6 +1013,31 @@ impl NicCtx {
});
}
#[cfg(target_os = "windows")]
fn start_windows_udp_broadcast_relay(&mut self, virtual_ipv4: Ipv4Inet) {
if !self.global_ctx.get_flags().enable_udp_broadcast_relay {
return;
}
let Some(peer_manager) = self.peer_mgr.upgrade() else {
tracing::warn!("peer manager is dropped, skip Windows UDP broadcast relay");
return;
};
match super::windows_udp_broadcast::start(peer_manager, virtual_ipv4) {
Ok(handle) => {
self.windows_udp_broadcast_relay = Some(handle);
tracing::info!("Windows UDP broadcast relay started");
}
Err(err) => {
tracing::warn!(
?err,
"failed to start Windows UDP broadcast relay; administrator privileges are required"
);
}
}
}
async fn apply_route_changes(
ifcfg: &impl IfConfiguerTrait,
ifname: &str,
@@ -1347,6 +1380,8 @@ impl NicCtx {
// Assign IPv4 address if provided
if let Some(ipv4_addr) = ipv4_addr {
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
#[cfg(target_os = "windows")]
self.start_windows_udp_broadcast_relay(ipv4_addr);
}
// Assign IPv6 address if provided
File diff suppressed because it is too large Load Diff
+22
View File
@@ -474,6 +474,28 @@ fn handle_event(
);
}
GlobalCtxEvent::UdpBroadcastRelayStartResult {
capture_backend,
error,
} => {
if let Some(error) = error {
event!(
warn,
?capture_backend,
%error,
"[{}] UDP broadcast relay start failed",
instance_id
);
} else {
event!(
info,
?capture_backend,
"[{}] UDP broadcast relay started",
instance_id
);
}
}
GlobalCtxEvent::CredentialChanged => {
event!(info, "[{}] credential changed", instance_id);
}
+6
View File
@@ -820,6 +820,10 @@ impl NetworkConfig {
flags.disable_relay_data = disable_relay_data;
}
if let Some(enable_udp_broadcast_relay) = self.enable_udp_broadcast_relay {
flags.enable_udp_broadcast_relay = enable_udp_broadcast_relay;
}
if let Some(disable_sym_hole_punching) = self.disable_sym_hole_punching {
flags.disable_sym_hole_punching = disable_sym_hole_punching;
}
@@ -995,6 +999,7 @@ impl NetworkConfig {
result.disable_udp_hole_punching = Some(flags.disable_udp_hole_punching);
result.disable_upnp = Some(flags.disable_upnp);
result.disable_relay_data = Some(flags.disable_relay_data);
result.enable_udp_broadcast_relay = Some(flags.enable_udp_broadcast_relay);
result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching);
result.enable_magic_dns = Some(flags.accept_dns);
result.mtu = Some(flags.mtu as i32);
@@ -1263,6 +1268,7 @@ mod tests {
flags.disable_tcp_hole_punching = rng.gen_bool(0.2);
flags.disable_udp_hole_punching = rng.gen_bool(0.2);
flags.disable_upnp = rng.gen_bool(0.2);
flags.enable_udp_broadcast_relay = rng.gen_bool(0.2);
flags.accept_dns = rng.gen_bool(0.6);
flags.mtu = rng.gen_range(1200..1500);
flags.private_mode = rng.gen_bool(0.3);
+125 -15
View File
@@ -6,11 +6,15 @@ in the future, with the help wo peer center we can forward packets of peers that
connected to any node in the local network.
*/
use std::{
sync::{Arc, Weak},
sync::{
Arc, Weak,
atomic::{AtomicBool, Ordering},
},
time::SystemTime,
};
use dashmap::{DashMap, DashSet};
use guarden::defer;
use tokio::{
sync::{
Mutex,
@@ -93,6 +97,7 @@ struct ForeignNetworkEntry {
stats_mgr: Arc<StatsManager>,
traffic_metrics: Arc<TrafficMetricRecorder>,
event_handler_started: AtomicBool,
tasks: Mutex<JoinSet<()>>,
@@ -160,10 +165,11 @@ impl ForeignNetworkEntry {
InstanceLabelKind::From,
)),
{
let peer_map = peer_map.clone();
let peer_map = Arc::downgrade(&peer_map);
move |peer_id| {
let peer_map = peer_map.clone();
async move {
let peer_map = peer_map.upgrade()?;
peer_map
.get_route_peer_info(peer_id)
.await
@@ -230,6 +236,7 @@ impl ForeignNetworkEntry {
stats_mgr,
traffic_metrics,
event_handler_started: AtomicBool::new(false),
tasks: Mutex::new(JoinSet::new()),
@@ -674,6 +681,8 @@ struct ForeignNetworkManagerData {
network_peer_last_update: DashMap<String, SystemTime>,
accessor: Arc<Box<dyn GlobalForeignNetworkAccessor>>,
lock: std::sync::Mutex<()>,
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool,
}
impl ForeignNetworkManagerData {
@@ -732,6 +741,36 @@ impl ForeignNetworkManagerData {
shrink_dashmap(&self.network_peer_last_update, None);
}
fn remove_network_if_current(
&self,
network_name: &String,
expected_entry: &Weak<ForeignNetworkEntry>,
) {
let _l = self.lock.lock().unwrap();
let Some(expected_entry) = expected_entry.upgrade() else {
return;
};
let old = self
.network_peer_maps
.remove_if(network_name, |_, entry| Arc::ptr_eq(entry, &expected_entry));
let Some((_, old)) = old else {
return;
};
old.traffic_metrics.clear_peer_cache();
let to_remove_peers = old.peer_map.list_peers();
for p in to_remove_peers {
self.peer_network_map.remove_if(&p, |_, v| {
v.remove(network_name);
v.is_empty()
});
}
self.network_peer_last_update.remove(network_name);
shrink_dashmap(&self.peer_network_map, None);
shrink_dashmap(&self.network_peer_maps, None);
shrink_dashmap(&self.network_peer_last_update, None);
}
#[allow(clippy::too_many_arguments)]
async fn get_or_insert_entry(
&self,
@@ -874,6 +913,8 @@ impl ForeignNetworkManager {
network_peer_last_update: DashMap::new(),
accessor: Arc::new(accessor),
lock: std::sync::Mutex::new(()),
#[cfg(test)]
fail_next_add_peer_conn_after_entry_insert: AtomicBool::new(false),
});
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
@@ -891,6 +932,13 @@ impl ForeignNetworkManager {
}
}
#[cfg(test)]
fn fail_next_add_peer_conn_after_entry_insert(&self) {
self.data
.fail_next_add_peer_conn_after_entry_insert
.store(true, Ordering::Release);
}
pub fn get_network_peer_id(&self, network_name: &str) -> Option<PeerId> {
self.data
.network_peer_maps
@@ -939,6 +987,35 @@ impl ForeignNetworkManager {
)
.await;
defer!(rollback_new_entry => sync [
data = self.data.clone(),
network_name = entry.network.network_name.clone(),
peer_id = peer_conn.get_peer_id(),
should_rollback = new_added
] {
if should_rollback {
tracing::warn!(
%network_name,
"rollback newly added foreign network entry after add_peer_conn returned error"
);
data.remove_peer(peer_id, &network_name);
}
});
#[cfg(test)]
if self
.data
.fail_next_add_peer_conn_after_entry_insert
.swap(false, Ordering::AcqRel)
{
return Err(anyhow::anyhow!(
"injected add_peer_conn failure after foreign network entry insert"
)
.into());
}
self.ensure_event_handler_started(&entry);
let same_identity = entry.network == peer_network;
let peer_identity_type = peer_conn.get_peer_identity_type();
let credential_peer_trusted = peer_digest_empty
@@ -952,10 +1029,6 @@ impl ForeignNetworkManager {
|| credential_identity_mismatch
|| entry.my_peer_id != peer_conn.get_my_peer_id()
{
if new_added {
self.data
.remove_peer(peer_conn.get_peer_id(), &entry.network.network_name.clone());
}
let err = if entry.my_peer_id != peer_conn.get_my_peer_id() {
anyhow::anyhow!(
"my peer id not match. exp: {:?} real: {:?}, need retry connect",
@@ -980,9 +1053,7 @@ impl ForeignNetworkManager {
return Err(err.into());
}
if new_added {
self.start_event_handler(&entry).await;
} else if let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) {
if !new_added && let Some(peer) = entry.peer_map.get_peer_by_id(peer_conn.get_peer_id()) {
let direct_conns_len = peer.get_directly_connections().len();
let max_count = use_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK);
if direct_conns_len >= max_count as usize {
@@ -996,23 +1067,31 @@ impl ForeignNetworkManager {
}
entry.peer_map.add_new_peer_conn(peer_conn).await?;
let _ = rollback_new_entry.defuse();
Ok(())
}
async fn start_event_handler(&self, entry: &ForeignNetworkEntry) {
fn ensure_event_handler_started(&self, entry: &Arc<ForeignNetworkEntry>) {
if entry.event_handler_started.swap(true, Ordering::AcqRel) {
return;
}
let data = self.data.clone();
let network_name = entry.network.network_name.clone();
let traffic_metrics = entry.traffic_metrics.clone();
let entry_for_cleanup = Arc::downgrade(entry);
let traffic_metrics = Arc::downgrade(&entry.traffic_metrics);
let mut s = entry.global_ctx.subscribe();
self.tasks.lock().unwrap().spawn(async move {
while let Ok(e) = s.recv().await {
match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => {
tracing::info!(?e, "remove peer from foreign network manager");
traffic_metrics.remove_peer(*peer_id);
data.remove_peer(*peer_id, &network_name);
if let Some(traffic_metrics) = traffic_metrics.upgrade() {
traffic_metrics.remove_peer(*peer_id);
}
data.network_peer_last_update
.insert(network_name.clone(), SystemTime::now());
data.remove_peer(*peer_id, &network_name);
}
GlobalCtxEvent::PeerConnRemoved(..) => {
tracing::info!(?e, "clear no conn peer from foreign network manager");
@@ -1028,8 +1107,10 @@ impl ForeignNetworkManager {
}
// if lagged or recv done just remove the network
tracing::error!("global event handler at foreign network manager exit");
traffic_metrics.clear_peer_cache();
data.remove_network(&network_name);
if let Some(traffic_metrics) = traffic_metrics.upgrade() {
traffic_metrics.clear_peer_cache();
}
data.remove_network_if_current(&network_name, &entry_for_cleanup);
});
}
@@ -1615,6 +1696,35 @@ pub mod tests {
.await;
}
#[tokio::test]
async fn failed_new_foreign_peer_conn_rolls_back_entry_maps() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
let foreign_mgr = pm_center.get_foreign_network_manager();
foreign_mgr.fail_next_add_peer_conn_after_entry_insert();
let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair();
let (client_ret, server_ret) = tokio::time::timeout(Duration::from_secs(5), async {
tokio::join!(
pma_net1.add_client_tunnel(a_ring, false),
pm_center.add_tunnel_as_server(b_ring, true)
)
})
.await
.unwrap();
assert!(client_ret.is_ok());
assert!(server_ret.is_err());
assert!(foreign_mgr.data.get_network_entry("net1").is_none());
assert!(
foreign_mgr
.data
.get_peer_network(pma_net1.my_peer_id())
.is_none()
);
}
#[tokio::test]
async fn foreign_network_peer_removed_clears_traffic_metric_peer_cache() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
+54 -12
View File
@@ -636,20 +636,27 @@ impl PeerManager {
#[tracing::instrument]
pub async fn try_direct_connect_with_peer_id_hint<C>(
&self,
mut connector: C,
connector: C,
peer_id_hint: Option<PeerId>,
) -> Result<(PeerId, PeerConnId), Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
let t = self.connect_tunnel(connector).await?;
self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint)
.await
}
pub(crate) async fn connect_tunnel<C>(&self, mut connector: C) -> Result<Box<dyn Tunnel>, Error>
where
C: TunnelConnector + Debug,
{
let ns = self.global_ctx.net_ns.clone();
Ok(ns
.run_async(|| async move { connector.connect().await })
.await?)
}
// avoid loop back to virtual network
fn check_remote_addr_not_from_virtual_network(
&self,
@@ -1562,17 +1569,26 @@ impl PeerManager {
ipv6_addr.is_multicast() || *ipv6_addr == ipv6_inet.last_address()
}
fn select_ipv4_broadcast_peers<'a>(
routes: impl IntoIterator<Item = &'a instance::Route>,
my_peer_id: PeerId,
) -> Vec<PeerId> {
routes
.into_iter()
.filter_map(|route| {
(route.peer_id != my_peer_id && route.ipv4_addr.is_some()).then_some(route.peer_id)
})
.collect()
}
pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
if self.is_all_peers_broadcast_ipv4(ipv4_addr) {
dst_peers.extend(self.peers.list_routes().await.iter().filter_map(|x| {
if *x.key() != self.my_peer_id {
Some(*x.key())
} else {
None
}
}));
dst_peers.extend(Self::select_ipv4_broadcast_peers(
&self.peers.list_route_infos().await,
self.my_peer_id,
));
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(ipv4_addr).await {
dst_peers.push(peer_id);
} else if !self
@@ -2192,6 +2208,32 @@ mod tests {
assert!(!PeerManager::should_mark_recent_traffic_for_fanout(2));
}
fn route_with_ipv4(
peer_id: u32,
ipv4_addr: Option<std::net::Ipv4Addr>,
) -> crate::proto::api::instance::Route {
crate::proto::api::instance::Route {
peer_id,
ipv4_addr: ipv4_addr.map(|addr| cidr::Ipv4Inet::new(addr, 24).unwrap().into()),
..Default::default()
}
}
#[test]
fn ipv4_broadcast_peer_selection_skips_peers_without_ipv4() {
let routes = vec![
route_with_ipv4(1, Some(std::net::Ipv4Addr::new(10, 126, 126, 1))),
route_with_ipv4(2, None),
route_with_ipv4(3, Some(std::net::Ipv4Addr::new(10, 126, 126, 3))),
route_with_ipv4(4, None),
];
assert_eq!(
PeerManager::select_ipv4_broadcast_peers(&routes, 3),
vec![1]
);
}
#[test]
fn gc_recent_traffic_removes_expired_and_connected_entries() {
let stale_peer = 1;
+1
View File
@@ -100,6 +100,7 @@ message NetworkConfig {
optional bool ipv6_public_addr_auto = 63;
optional string ipv6_public_addr_prefix = 64;
optional bool disable_relay_data = 65;
optional bool enable_udp_broadcast_relay = 66;
}
message PortForwardConfig {
+1
View File
@@ -76,6 +76,7 @@ message FlagsInConfig {
uint64 instance_recv_bps_limit = 39;
bool disable_upnp = 40;
bool disable_relay_data = 41;
bool enable_udp_broadcast_relay = 42;
}
message RpcDescriptor {
+26
View File
@@ -115,6 +115,12 @@ impl<R> FramedReader<R> {
return Some(Err(TunnelError::InvalidPacket("body too long".to_string())));
}
if body_len < PEER_MANAGER_HEADER_SIZE {
return Some(Err(TunnelError::InvalidPacket(
"body too short".to_string(),
)));
}
if buf.len() < TCP_TUNNEL_HEADER_SIZE + body_len {
// body is not complete
return None;
@@ -555,6 +561,26 @@ pub mod tests {
tunnel::{TunnelConnector, TunnelListener, packet_def::ZCPacket},
};
#[cfg(test)]
use crate::tunnel::{
TunnelError,
packet_def::{PEER_MANAGER_HEADER_SIZE, TCP_TUNNEL_HEADER_SIZE},
};
#[test]
fn framed_reader_rejects_short_peer_manager_body() {
let mut buf = BytesMut::new();
buf.put_u32_le((PEER_MANAGER_HEADER_SIZE - 1) as u32);
buf.resize(TCP_TUNNEL_HEADER_SIZE + PEER_MANAGER_HEADER_SIZE - 1, 0);
let ret = super::FramedReader::<tokio::io::Empty>::extract_one_packet(&mut buf, 2000);
assert!(matches!(
ret,
Some(Err(TunnelError::InvalidPacket(msg))) if msg == "body too short"
));
}
pub async fn _tunnel_echo_server(tunnel: Box<dyn super::Tunnel>, once: bool) {
let (mut recv, mut send) = tunnel.split();
+7
View File
@@ -9,6 +9,7 @@ use crate::{
pub struct Controller {
token: String,
machine_id: uuid::Uuid,
hostname: String,
device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>,
@@ -18,6 +19,7 @@ pub struct Controller {
impl Controller {
pub fn new(
token: String,
machine_id: uuid::Uuid,
hostname: String,
device_os: DeviceOsInfo,
manager: Arc<NetworkInstanceManager>,
@@ -25,6 +27,7 @@ impl Controller {
) -> Self {
Controller {
token,
machine_id,
hostname,
device_os,
manager,
@@ -44,6 +47,10 @@ impl Controller {
self.hostname.clone()
}
pub fn machine_id(&self) -> uuid::Uuid {
self.machine_id
}
pub fn device_os(&self) -> DeviceOsInfo {
self.device_os.clone()
}
+19 -6
View File
@@ -2,11 +2,12 @@ use std::sync::Arc;
use crate::{
common::{
MachineIdOptions,
config::TomlConfigLoader,
global_ctx::{ArcGlobalCtx, GlobalCtx},
log,
os_info::collect_device_os_info,
set_default_machine_id,
resolve_machine_id,
stun::MockStunInfoCollector,
},
connector::create_connector_by_url,
@@ -81,6 +82,7 @@ impl WebClient {
pub fn new<T: TunnelConnector + 'static, S: ToString, H: ToString>(
connector: T,
token: S,
machine_id: Uuid,
hostname: H,
secure_mode: bool,
manager: Arc<NetworkInstanceManager>,
@@ -90,6 +92,7 @@ impl WebClient {
let hooks = hooks.unwrap_or_else(|| Arc::new(DefaultHooks));
let controller = Arc::new(controller::Controller::new(
token.to_string(),
machine_id,
hostname.to_string(),
collect_device_os_info(),
manager,
@@ -229,13 +232,14 @@ impl WebClient {
pub async fn run_web_client(
config_server_url_s: &str,
machine_id: Option<String>,
machine_id_opts: MachineIdOptions,
hostname: Option<String>,
secure_mode: bool,
manager: Arc<NetworkInstanceManager>,
hooks: Option<Arc<dyn WebClientHooks>>,
) -> Result<WebClient> {
set_default_machine_id(machine_id);
let machine_id = resolve_machine_id(&machine_id_opts)
.with_context(|| "failed to resolve machine id for web client")?;
let config_server_url = match Url::parse(config_server_url_s) {
Ok(u) => u,
Err(_) => format!(
@@ -289,6 +293,7 @@ pub async fn run_web_client(
global_ctx,
},
token.to_string(),
machine_id,
hostname,
secure_mode,
manager,
@@ -300,14 +305,18 @@ pub async fn run_web_client(
mod tests {
use std::sync::{Arc, atomic::AtomicBool};
use crate::instance_manager::NetworkInstanceManager;
use crate::{common::MachineIdOptions, instance_manager::NetworkInstanceManager};
#[tokio::test]
async fn test_manager_wait() {
let manager = Arc::new(NetworkInstanceManager::new());
let temp_dir = tempfile::tempdir().unwrap();
let client = super::run_web_client(
format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(),
None,
MachineIdOptions {
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
},
None,
false,
manager.clone(),
@@ -335,9 +344,13 @@ mod tests {
#[tokio::test]
async fn test_run_web_client_with_unreachable_config_server() {
let manager = Arc::new(NetworkInstanceManager::new());
let temp_dir = tempfile::tempdir().unwrap();
let client = super::run_web_client(
"udp://config-server.invalid:22020/test",
None,
MachineIdOptions {
explicit_machine_id: None,
state_dir: Some(temp_dir.path().to_path_buf()),
},
None,
false,
manager,
+48 -7
View File
@@ -101,7 +101,11 @@ impl TunnelFilter for SecureDatagramTunnelFilter {
Err(e) => return Some(Err(e)),
};
let mut cipher = ZCPacket::new_with_payload(packet.payload());
let payload = match checked_payload(&packet, "secure packet") {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
let mut cipher = ZCPacket::new_with_payload(payload);
cipher.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
cipher
.mut_peer_manager_header()
@@ -116,15 +120,27 @@ impl TunnelFilter for SecureDatagramTunnelFilter {
))));
}
Some(Ok(ZCPacket::new_from_buf(
cipher.payload_bytes(),
ZCPacketType::DummyTunnel,
)))
let packet = ZCPacket::new_from_buf(cipher.payload_bytes(), ZCPacketType::DummyTunnel);
if packet.peer_manager_header().is_none() {
return Some(Err(TunnelError::InvalidPacket(
"decrypted secure packet too short".to_string(),
)));
}
Some(Ok(packet))
}
fn filter_output(&self) {}
}
fn checked_payload<'a>(packet: &'a ZCPacket, context: &str) -> Result<&'a [u8], TunnelError> {
if packet.peer_manager_header().is_none() {
return Err(TunnelError::InvalidPacket(format!("{context} too short")));
}
Ok(packet.payload())
}
fn pack_control_packet(payload: &[u8]) -> ZCPacket {
let mut packet = ZCPacket::new_with_payload(payload);
packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
@@ -214,7 +230,8 @@ pub async fn upgrade_client_tunnel(
Ok(None) => return Err(TunnelError::Shutdown),
Err(error) => return Err(error.into()),
};
let msg2_cipher = decode_noise_payload(msg2_packet.payload())
let msg2_payload = checked_payload(&msg2_packet, "noise msg2 packet")?;
let msg2_cipher = decode_noise_payload(msg2_payload)
.ok_or_else(|| TunnelError::InvalidPacket("invalid noise msg2 magic".to_string()))?;
let mut root_key_buf = [0u8; 32];
let root_key_len = state
@@ -254,7 +271,8 @@ pub async fn accept_or_upgrade_server_tunnel(
));
}
};
let Some(msg1_cipher) = decode_noise_payload(first_packet.payload()) else {
let first_payload = checked_payload(&first_packet, "first packet")?;
let Some(msg1_cipher) = decode_noise_payload(first_payload) else {
let stream = Box::pin(futures::stream::once(async move { Ok(first_packet) }).chain(stream));
return Ok((
Box::new(RawSplitTunnel::new(info, stream, sink)) as Box<dyn Tunnel>,
@@ -303,6 +321,7 @@ pub async fn accept_or_upgrade_server_tunnel(
mod tests {
use super::*;
use crate::tunnel::ring::create_ring_tunnel_pair;
use bytes::BytesMut;
#[test]
fn web_secure_cipher_algorithm_matches_support_flag() {
@@ -338,6 +357,28 @@ mod tests {
assert!(matches!(err, TunnelError::Timeout(_)));
}
#[tokio::test]
async fn accept_secure_tunnel_rejects_short_first_packet() {
let (server_tunnel, client_tunnel) = create_ring_tunnel_pair();
let server_task =
tokio::spawn(async move { accept_or_upgrade_server_tunnel(server_tunnel).await });
let (_stream, mut sink) = client_tunnel.split();
sink.send(ZCPacket::new_from_buf(
BytesMut::from(&b"\x01"[..]),
ZCPacketType::TCP,
))
.await
.unwrap();
let err = server_task.await.unwrap().unwrap_err();
assert!(matches!(
err,
TunnelError::InvalidPacket(msg) if msg == "first packet too short"
));
}
#[tokio::test]
async fn accept_secure_tunnel_after_short_client_delay() {
let (server_tunnel, client_tunnel) = create_ring_tunnel_pair();
+7 -5
View File
@@ -7,7 +7,7 @@ use tokio::{
};
use crate::{
common::{constants::EASYTIER_VERSION, get_machine_id},
common::constants::EASYTIER_VERSION,
proto::{
rpc_impl::bidirect::BidirectRpcManager,
rpc_types::controller::BaseController,
@@ -65,11 +65,13 @@ impl Session {
tasks: &mut JoinSet<()>,
ctx: HeartbeatCtx,
) {
let mid = get_machine_id();
let controller = controller.upgrade().unwrap();
let mid = controller.machine_id();
let inst_id = uuid::Uuid::new_v4();
let token = controller.upgrade().unwrap().token();
let hostname = controller.upgrade().unwrap().hostname();
let device_os = controller.upgrade().unwrap().device_os();
let token = controller.token();
let hostname = controller.hostname();
let device_os = controller.device_os();
let controller = Arc::downgrade(&controller);
let ctx_clone = ctx.clone();
let mut tick = interval(std::time::Duration::from_secs(1));