Compare commits

..

7 Commits

Author SHA1 Message Date
Sijie.Sun c23b544c34 tcp accept should retry when encoutering some kinds of error (#565)
* tcp accept should retry when encoutering some kinds of error

bump version to v2.1.2

* persistent temporary machine id
2025-01-14 08:55:48 +08:00
Sijie.Sun 9d76b86f49 fix bugs (#561)
1. if peers disconnected before stop session, may crash at the assert.
2. bind_device flag should take effect on manual connector.
2025-01-12 00:16:38 +08:00
Sijie.Sun bb0ccca3e5 allow manually specify public address of listeners (#556) 2025-01-10 09:25:14 +08:00
Sijie.Sun 306817ae9a allow listener retry listen (#554) 2025-01-09 00:01:41 +08:00
Sijie.Sun d2ec60e108 batch recv for udp proxy (#552) 2025-01-07 23:52:18 +08:00
Sijie.Sun e016aeddeb optimize pingpong conn close condition (#549)
if received some packets from peer, only close conn after enough
rounds of pingpong
2025-01-07 22:42:57 +08:00
Sijie.Sun a4419a31fd fix peer rpc compatibility issue (#548)
every rpc packet should contains descriptor if sent to old version et.
2025-01-06 23:30:56 +08:00
25 changed files with 591 additions and 309 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ on:
version: version:
description: 'Version for this release' description: 'Version for this release'
type: string type: string
default: 'v2.1.1' default: 'v2.1.2'
required: true required: true
make_latest: make_latest:
description: 'Mark this release as latest' description: 'Mark this release as latest'
Generated
+2 -2
View File
@@ -1830,7 +1830,7 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]] [[package]]
name = "easytier" name = "easytier"
version = "2.1.1" version = "2.1.2"
dependencies = [ dependencies = [
"aes-gcm", "aes-gcm",
"anyhow", "anyhow",
@@ -1926,7 +1926,7 @@ dependencies = [
[[package]] [[package]]
name = "easytier-gui" name = "easytier-gui"
version = "2.1.1" version = "2.1.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
+7 -8
View File
@@ -31,6 +31,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected. - **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
- **IPv6 Support**: Supports networking using IPv6. - **IPv6 Support**: Supports networking using IPv6.
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC. - **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
- **Web Management Interface**: Provides a [web-based management](https://easytier.cn/web) interface for easy configuration and monitoring.
## Installation ## Installation
@@ -52,7 +53,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
4. **Install by Docker Compose** 4. **Install by Docker Compose**
Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation. Please visit the [EasyTier Official Website](https://www.easytier.cn/en/) to view the full documentation.
5. **Install by script (For Linux Only)** 5. **Install by script (For Linux Only)**
@@ -200,20 +201,20 @@ Subnet proxy information will automatically sync to each node in the virtual net
### Networking without Public IP ### Networking without Public IP
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.top:11010``. EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.cn:11010``.
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network. When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
Taking two nodes as an example, Node A executes: Taking two nodes as an example, Node A executes:
```sh ```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010 sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
``` ```
Node B executes Node B executes
```sh ```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010 sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
``` ```
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2. After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
@@ -286,7 +287,7 @@ Run you own public server cluster is exactly same as running an virtual network,
You can also join the official public server cluster with following command: You can also join the official public server cluster with following command:
``` ```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010 sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
``` ```
@@ -296,10 +297,8 @@ You can use ``easytier-core --help`` to view all configuration items
## Roadmap ## Roadmap
- [ ] Improve documentation and user guides. - [ ] Support features such TCP hole punching, KCP, FEC etc.
- [ ] Support features such as encryption, TCP hole punching, etc.
- [ ] Support iOS. - [ ] Support iOS.
- [ ] Support Web configuration management.
## Community and Contribution ## Community and Contribution
+8 -8
View File
@@ -8,7 +8,7 @@
[简体中文](/README_CN.md) | [English](/README.md) [简体中文](/README_CN.md) | [English](/README.md)
**请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。** **请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。**
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。 一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
@@ -31,6 +31,7 @@
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。 - **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
- **IPV6 支持**:支持利用 IPV6 组网。 - **IPV6 支持**:支持利用 IPV6 组网。
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。 - **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
- **Web 管理界面**:支持通过 [Web 界面](https://easytier.cn)管理节点。
## 安装 ## 安装
@@ -52,7 +53,7 @@
4. **通过Docker Compose安装** 4. **通过Docker Compose安装**
请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。 请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。
5. **使用一键脚本安装 (仅适用于 Linux)** 5. **使用一键脚本安装 (仅适用于 Linux)**
@@ -199,20 +200,20 @@ sudo easytier-core --ipv4 10.144.144.2 -n 10.1.1.0/24
### 无公网IP组网 ### 无公网IP组网
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.top:11010``。 EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.cn:11010``。
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。 使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
以双节点为例,节点 A 执行: 以双节点为例,节点 A 执行:
```sh ```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010 sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
``` ```
节点 B 执行 节点 B 执行
```sh ```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010 sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
``` ```
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。 命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
@@ -289,7 +290,7 @@ connected_clients:
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡: 也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
``` ```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010 sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
``` ```
### 其他配置 ### 其他配置
@@ -299,9 +300,8 @@ sudo easytier-core --network-name easytier --network-secret easytier -p tcp://pu
## 路线图 ## 路线图
- [ ] 完善文档和用户指南。 - [ ] 完善文档和用户指南。
- [ ] 支持 TCP 打洞等特性。 - [ ] 支持 TCP 打洞、KCP、FEC 等特性。
- [ ] 支持 iOS。 - [ ] 支持 iOS。
- [ ] 支持 Web 配置管理。
## 社区和贡献 ## 社区和贡献
+1 -1
View File
@@ -1,7 +1,7 @@
{ {
"name": "easytier-gui", "name": "easytier-gui",
"type": "module", "type": "module",
"version": "2.1.1", "version": "2.1.2",
"private": true, "private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4", "packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": { "scripts": {
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "easytier-gui" name = "easytier-gui"
version = "2.1.1" version = "2.1.2"
description = "EasyTier GUI" description = "EasyTier GUI"
authors = ["you"] authors = ["you"]
edition = "2021" edition = "2021"
+1 -1
View File
@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false "createUpdaterArtifacts": false
}, },
"productName": "easytier-gui", "productName": "easytier-gui",
"version": "2.1.1", "version": "2.1.2",
"identifier": "com.kkrainbow.easytier", "identifier": "com.kkrainbow.easytier",
"plugins": {}, "plugins": {},
"app": { "app": {
+1 -1
View File
@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command." description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier" homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier" repository = "https://github.com/EasyTier/EasyTier"
version = "2.1.1" version = "2.1.2"
edition = "2021" edition = "2021"
authors = ["kkrainbow"] authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"] keywords = ["vpn", "p2p", "network", "easytier"]
+6
View File
@@ -134,6 +134,12 @@ core_clap:
compression: compression:
en: "compression algorithm to use, support none, zstd. default is none" en: "compression algorithm to use, support none, zstd. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none" zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
mapped_listeners:
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。"
bind_device:
en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally."
zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。"
core_app: core_app:
panic_backtrace_save: panic_backtrace_save:
+19 -1
View File
@@ -27,8 +27,9 @@ pub fn gen_default_flags() -> Flags {
relay_all_peer_rpc: false, relay_all_peer_rpc: false,
disable_udp_hole_punching: false, disable_udp_hole_punching: false,
ipv6_listener: "udp://[::]:0".to_string(), ipv6_listener: "udp://[::]:0".to_string(),
multi_thread: false, multi_thread: true,
data_compress_algo: CompressionAlgoPb::None.into(), data_compress_algo: CompressionAlgoPb::None.into(),
bind_device: true,
} }
} }
@@ -72,6 +73,9 @@ pub trait ConfigLoader: Send + Sync {
fn get_listeners(&self) -> Vec<url::Url>; fn get_listeners(&self) -> Vec<url::Url>;
fn set_listeners(&self, listeners: Vec<url::Url>); fn set_listeners(&self, listeners: Vec<url::Url>);
fn get_mapped_listeners(&self) -> Vec<url::Url>;
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
fn get_rpc_portal(&self) -> Option<SocketAddr>; fn get_rpc_portal(&self) -> Option<SocketAddr>;
fn set_rpc_portal(&self, addr: SocketAddr); fn set_rpc_portal(&self, addr: SocketAddr);
@@ -183,6 +187,7 @@ struct Config {
dhcp: Option<bool>, dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>, network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>, listeners: Option<Vec<url::Url>>,
mapped_listeners: Option<Vec<url::Url>>,
exit_nodes: Option<Vec<Ipv4Addr>>, exit_nodes: Option<Vec<Ipv4Addr>>,
peer: Option<Vec<PeerConfig>>, peer: Option<Vec<PeerConfig>>,
@@ -472,6 +477,19 @@ impl ConfigLoader for TomlConfigLoader {
self.config.lock().unwrap().listeners = Some(listeners); self.config.lock().unwrap().listeners = Some(listeners);
} }
fn get_mapped_listeners(&self) -> Vec<url::Url> {
self.config
.lock()
.unwrap()
.mapped_listeners
.clone()
.unwrap_or_default()
}
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
self.config.lock().unwrap().mapped_listeners = listeners;
}
fn get_rpc_portal(&self) -> Option<SocketAddr> { fn get_rpc_portal(&self) -> Option<SocketAddr> {
self.config.lock().unwrap().rpc_portal self.config.lock().unwrap().rpc_portal
} }
+4 -1
View File
@@ -230,7 +230,10 @@ impl GlobalCtx {
} }
pub fn add_running_listener(&self, url: url::Url) { pub fn add_running_listener(&self, url: url::Url) {
self.running_listeners.lock().unwrap().push(url); let mut l = self.running_listeners.lock().unwrap();
if !l.contains(&url) {
l.push(url);
}
} }
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> { pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {
+23 -3
View File
@@ -1,6 +1,7 @@
use std::{ use std::{
fmt::Debug, fmt::Debug,
future, future,
io::Write as _,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tokio::task::JoinSet; use tokio::task::JoinSet;
@@ -81,7 +82,17 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
} }
pub fn get_machine_id() -> uuid::Uuid { pub fn get_machine_id() -> uuid::Uuid {
// TODO: load from local file // a path same as the binary
let machine_id_file = std::env::current_exe()
.map(|x| x.with_file_name("et_machine_id"))
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
// try load from local file
if let Ok(mid) = std::fs::read_to_string(&machine_id_file) {
if let Ok(mid) = uuid::Uuid::parse_str(mid.trim()) {
return mid;
}
}
#[cfg(any( #[cfg(any(
target_os = "linux", target_os = "linux",
@@ -95,7 +106,7 @@ pub fn get_machine_id() -> uuid::Uuid {
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b); crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
uuid::Uuid::from_bytes(b) uuid::Uuid::from_bytes(b)
}) })
.unwrap_or(uuid::Uuid::new_v4()); .ok();
#[cfg(not(any( #[cfg(not(any(
target_os = "linux", target_os = "linux",
@@ -103,9 +114,18 @@ pub fn get_machine_id() -> uuid::Uuid {
target_os = "windows", target_os = "windows",
target_os = "freebsd" target_os = "freebsd"
)))] )))]
let gen_mid = None;
if gen_mid.is_some() {
return gen_mid.unwrap();
}
let gen_mid = uuid::Uuid::new_v4(); let gen_mid = uuid::Uuid::new_v4();
// TODO: save to local file // try save to local file
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
let _ = file.write_all(gen_mid.to_string().as_bytes());
}
gen_mid gen_mid
} }
+145 -77
View File
@@ -1,6 +1,13 @@
// try connect peers directly, with either its public ip or lan ip // try connect peers directly, with either its public ip or lan ip
use std::{net::SocketAddr, sync::Arc, time::Duration}; use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use crate::{ use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
@@ -29,6 +36,8 @@ use super::create_connector_by_url;
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1; pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300; pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait PeerManagerForDirectConnector { pub trait PeerManagerForDirectConnector {
async fn list_peers(&self) -> Vec<PeerId>; async fn list_peers(&self) -> Vec<PeerId>;
@@ -182,7 +191,7 @@ impl DirectConnectorManager {
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?; // let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id { if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!( tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}", "connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
addr, addr,
@@ -279,87 +288,103 @@ impl DirectConnectorManager {
let listener_host = listener.socket_addrs(|| None).unwrap().pop(); let listener_host = listener.socket_addrs(|| None).unwrap().pop();
match listener_host { match listener_host {
Some(SocketAddr::V4(_)) => { Some(SocketAddr::V4(s_addr)) => {
ip_list.interface_ipv4s.iter().for_each(|ip| { if s_addr.ip().is_unspecified() {
let mut addr = (*listener).clone(); ip_list.interface_ipv4s.iter().for_each(|ip| {
if addr.set_host(Some(ip.to_string().as_str())).is_ok() { let mut addr = (*listener).clone();
tasks.spawn(Self::try_connect_to_ip( if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
data.clone(), tasks.spawn(Self::try_connect_to_ip(
dst_peer_id.clone(), data.clone(),
addr.to_string(), dst_peer_id.clone(),
)); addr.to_string(),
} else { ));
tracing::error!( } else {
?ip, tracing::error!(
?listener, ?ip,
?dst_peer_id, ?listener,
"failed to set host for interface ipv4" ?dst_peer_id,
); "failed to set host for interface ipv4"
} );
}); }
});
if let Some(public_ipv4) = ip_list.public_ipv4 { if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone(); let mut addr = (*listener).clone();
if addr if addr
.set_host(Some(public_ipv4.to_string().as_str())) .set_host(Some(public_ipv4.to_string().as_str()))
.is_ok() .is_ok()
{ {
tasks.spawn(Self::try_connect_to_ip( tasks.spawn(Self::try_connect_to_ip(
data.clone(), data.clone(),
dst_peer_id.clone(), dst_peer_id.clone(),
addr.to_string(), addr.to_string(),
)); ));
} else { } else {
tracing::error!( tracing::error!(
?public_ipv4, ?public_ipv4,
?listener, ?listener,
?dst_peer_id, ?dst_peer_id,
"failed to set host for public ipv4" "failed to set host for public ipv4"
); );
}
} }
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
} }
} }
Some(SocketAddr::V6(_)) => { Some(SocketAddr::V6(s_addr)) => {
ip_list.interface_ipv6s.iter().for_each(|ip| { if s_addr.ip().is_unspecified() {
let mut addr = (*listener).clone(); ip_list.interface_ipv6s.iter().for_each(|ip| {
if addr let mut addr = (*listener).clone();
.set_host(Some(format!("[{}]", ip.to_string()).as_str())) if addr
.is_ok() .set_host(Some(format!("[{}]", ip.to_string()).as_str()))
{ .is_ok()
tasks.spawn(Self::try_connect_to_ip( {
data.clone(), tasks.spawn(Self::try_connect_to_ip(
dst_peer_id.clone(), data.clone(),
addr.to_string(), dst_peer_id.clone(),
)); addr.to_string(),
} else { ));
tracing::error!( } else {
?ip, tracing::error!(
?listener, ?ip,
?dst_peer_id, ?listener,
"failed to set host for interface ipv6" ?dst_peer_id,
); "failed to set host for interface ipv6"
} );
}); }
});
if let Some(public_ipv6) = ip_list.public_ipv6 { if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone(); let mut addr = (*listener).clone();
if addr if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str())) .set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok() .is_ok()
{ {
tasks.spawn(Self::try_connect_to_ip( tasks.spawn(Self::try_connect_to_ip(
data.clone(), data.clone(),
dst_peer_id.clone(), dst_peer_id.clone(),
addr.to_string(), addr.to_string(),
)); ));
} else { } else {
tracing::error!( tracing::error!(
?public_ipv6, ?public_ipv6,
?listener, ?listener,
?dst_peer_id, ?dst_peer_id,
"failed to set host for public ipv6" "failed to set host for public ipv6"
); );
}
} }
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
} }
} }
p => { p => {
@@ -452,6 +477,49 @@ mod tests {
proto::peer_rpc::GetIpListResponse, proto::peer_rpc::GetIpListResponse,
}; };
use super::TESTING;
#[tokio::test]
async fn direct_connector_mapped_listener() {
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
let p_x = create_mock_peer_manager().await;
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
connect_peer_manager(p_c.clone(), p_x.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
let mut f = p_a.get_global_ctx().get_flags();
f.bind_device = false;
p_a.get_global_ctx().config.set_flags(f);
p_c.get_global_ctx()
.config
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
p_x.get_global_ctx()
.config
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
lis_x.prepare_listeners().await.unwrap();
lis_x.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
dm_a.run_as_client();
dm_c.run_as_server();
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
.await
.unwrap();
}
#[rstest::rstest] #[rstest::rstest]
#[tokio::test] #[tokio::test]
async fn direct_connector_basic_test( async fn direct_connector_basic_test(
+8 -6
View File
@@ -297,12 +297,14 @@ impl ManualConnectorManager {
connector.lock().await.set_ip_version(ip_version); connector.lock().await.set_ip_version(ip_version);
set_bind_addr_for_peer_connector( if data.global_ctx.config.get_flags().bind_device {
connector.lock().await.as_mut(), set_bind_addr_for_peer_connector(
ip_version == IpVersion::V4, connector.lock().await.as_mut(),
&ip_collector, ip_version == IpVersion::V4,
) &ip_collector,
.await; )
.await;
}
data.global_ctx.issue_event(GlobalCtxEvent::Connecting( data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(), connector.lock().await.remote_url().clone(),
+40 -30
View File
@@ -56,23 +56,27 @@ pub async fn create_connector_by_url(
"tcp" => { "tcp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?; let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
let mut connector = TcpTunnelConnector::new(url); let mut connector = TcpTunnelConnector::new(url);
set_bind_addr_for_peer_connector( if global_ctx.config.get_flags().bind_device {
&mut connector, set_bind_addr_for_peer_connector(
dst_addr.is_ipv4(), &mut connector,
&global_ctx.get_ip_collector(), dst_addr.is_ipv4(),
) &global_ctx.get_ip_collector(),
.await; )
.await;
}
return Ok(Box::new(connector)); return Ok(Box::new(connector));
} }
"udp" => { "udp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?; let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
let mut connector = UdpTunnelConnector::new(url); let mut connector = UdpTunnelConnector::new(url);
set_bind_addr_for_peer_connector( if global_ctx.config.get_flags().bind_device {
&mut connector, set_bind_addr_for_peer_connector(
dst_addr.is_ipv4(), &mut connector,
&global_ctx.get_ip_collector(), dst_addr.is_ipv4(),
) &global_ctx.get_ip_collector(),
.await; )
.await;
}
return Ok(Box::new(connector)); return Ok(Box::new(connector));
} }
"ring" => { "ring" => {
@@ -84,12 +88,14 @@ pub async fn create_connector_by_url(
"quic" => { "quic" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?; let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
let mut connector = QUICTunnelConnector::new(url); let mut connector = QUICTunnelConnector::new(url);
set_bind_addr_for_peer_connector( if global_ctx.config.get_flags().bind_device {
&mut connector, set_bind_addr_for_peer_connector(
dst_addr.is_ipv4(), &mut connector,
&global_ctx.get_ip_collector(), dst_addr.is_ipv4(),
) &global_ctx.get_ip_collector(),
.await; )
.await;
}
return Ok(Box::new(connector)); return Ok(Box::new(connector));
} }
#[cfg(feature = "wireguard")] #[cfg(feature = "wireguard")]
@@ -101,12 +107,14 @@ pub async fn create_connector_by_url(
&nid.network_secret.unwrap_or_default(), &nid.network_secret.unwrap_or_default(),
); );
let mut connector = WgTunnelConnector::new(url, wg_config); let mut connector = WgTunnelConnector::new(url, wg_config);
set_bind_addr_for_peer_connector( if global_ctx.config.get_flags().bind_device {
&mut connector, set_bind_addr_for_peer_connector(
dst_addr.is_ipv4(), &mut connector,
&global_ctx.get_ip_collector(), dst_addr.is_ipv4(),
) &global_ctx.get_ip_collector(),
.await; )
.await;
}
return Ok(Box::new(connector)); return Ok(Box::new(connector));
} }
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
@@ -114,12 +122,14 @@ pub async fn create_connector_by_url(
use crate::tunnel::{FromUrl, IpVersion}; use crate::tunnel::{FromUrl, IpVersion};
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?; let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url); let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
set_bind_addr_for_peer_connector( if global_ctx.config.get_flags().bind_device {
&mut connector, set_bind_addr_for_peer_connector(
dst_addr.is_ipv4(), &mut connector,
&global_ctx.get_ip_collector(), dst_addr.is_ipv4(),
) &global_ctx.get_ip_collector(),
.await; )
.await;
}
return Ok(Box::new(connector)); return Ok(Box::new(connector));
} }
_ => { _ => {
+34 -1
View File
@@ -123,6 +123,13 @@ struct Cli {
)] )]
listeners: Vec<String>, listeners: Vec<String>,
#[arg(
long,
help = t!("core_clap.mapped_listeners").to_string(),
num_args = 0..
)]
mapped_listeners: Vec<String>,
#[arg( #[arg(
long, long,
help = t!("core_clap.no_listener").to_string(), help = t!("core_clap.no_listener").to_string(),
@@ -185,7 +192,7 @@ struct Cli {
#[arg( #[arg(
long, long,
help = t!("core_clap.multi_thread").to_string(), help = t!("core_clap.multi_thread").to_string(),
default_value = "false" default_value = "true"
)] )]
multi_thread: bool, multi_thread: bool,
@@ -300,6 +307,12 @@ struct Cli {
default_value = "none", default_value = "none",
)] )]
compression: String, compression: String,
#[arg(
long,
help = t!("core_clap.bind_device").to_string()
)]
bind_device: Option<bool>,
} }
rust_i18n::i18n!("locales", fallback = "en"); rust_i18n::i18n!("locales", fallback = "en");
@@ -422,6 +435,23 @@ impl TryFrom<&Cli> for TomlConfigLoader {
.collect(), .collect(),
); );
cfg.set_mapped_listeners(Some(
cli.mapped_listeners
.iter()
.map(|s| {
s.parse()
.with_context(|| format!("mapped listener is not a valid url: {}", s))
.unwrap()
})
.map(|s: url::Url| {
if s.port().is_none() {
panic!("mapped listener port is missing: {}", s);
}
s
})
.collect(),
));
for n in cli.proxy_networks.iter() { for n in cli.proxy_networks.iter() {
cfg.add_proxy_cidr( cfg.add_proxy_cidr(
n.parse() n.parse()
@@ -534,6 +564,9 @@ impl TryFrom<&Cli> for TomlConfigLoader {
), ),
} }
.into(); .into();
if let Some(bind_device) = cli.bind_device {
f.bind_device = bind_device;
}
cfg.set_flags(f); cfg.set_flags(f);
cfg.set_exit_nodes(cli.exit_nodes.clone()); cfg.set_exit_nodes(cli.exit_nodes.clone());
+66 -43
View File
@@ -4,6 +4,7 @@ use std::{
time::Duration, time::Duration,
}; };
use bytes::{BufMut, BytesMut};
use cidr::Ipv4Inet; use cidr::Ipv4Inet;
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
use dashmap::DashMap; use dashmap::DashMap;
@@ -24,11 +25,11 @@ use tokio::{
use tracing::Level; use tracing::Level;
use crate::{ use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
gateway::ip_reassembler::compose_ipv4_packet, gateway::ip_reassembler::compose_ipv4_packet,
peers::{peer_manager::PeerManager, PeerPacketFilter}, peers::{peer_manager::PeerManager, PeerPacketFilter},
tunnel::{ tunnel::{
common::setup_sokcet2, common::{reserve_buf, setup_sokcet2},
packet_def::{PacketType, ZCPacket}, packet_def::{PacketType, ZCPacket},
}, },
}; };
@@ -139,59 +140,81 @@ impl UdpNatEntry {
mut packet_sender: Sender<ZCPacket>, mut packet_sender: Sender<ZCPacket>,
virtual_ipv4: Ipv4Addr, virtual_ipv4: Ipv4Addr,
) { ) {
let mut buf = [0u8; 65536]; let (s, mut r) = tachyonix::channel(128);
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
let mut ip_id = 1;
loop { let self_clone = self.clone();
let (len, src_socket) = match timeout( let recv_task = ScopedTask::from(tokio::spawn(async move {
Duration::from_secs(120), let mut cur_buf = BytesMut::new();
self.socket.recv_from(&mut udp_body), loop {
) if self_clone
.await .stopped
{ .load(std::sync::atomic::Ordering::Relaxed)
Ok(Ok(x)) => x, {
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
break; break;
} }
Err(err) => {
tracing::error!(?err, "udp nat recv timeout"); reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
break; assert_eq!(cur_buf.len(), 0);
unsafe {
cur_buf.advance_mut(28);
} }
};
tracing::trace!(?len, ?src_socket, "udp nat packet response received"); let (len, src_socket) = match timeout(
Duration::from_secs(120),
self_clone.socket.recv_buf_from(&mut cur_buf),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
}
};
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) { tracing::trace!(?len, ?src_socket, "udp nat packet response received");
break;
let ret_buf = cur_buf.split();
s.send((ret_buf, len, src_socket)).await.unwrap();
} }
}));
let SocketAddr::V4(mut src_v4) = src_socket else { let self_clone = self.clone();
continue; let send_task = ScopedTask::from(tokio::spawn(async move {
}; let mut ip_id = 1;
while let Ok((mut packet, len, src_socket)) = r.recv().await {
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
self.mark_active(); self_clone.mark_active();
if src_v4.ip().is_loopback() { if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4); src_v4.set_ip(virtual_ipv4);
}
let Ok(_) = Self::compose_ipv4_packet(
&self_clone,
&mut packet_sender,
&mut packet,
&src_v4,
len,
1280,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
} }
}));
let Ok(_) = Self::compose_ipv4_packet( let _ = tokio::join!(recv_task, send_task);
&self,
&mut packet_sender,
&mut buf,
&src_v4,
len,
1200,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
self.stop(); self.stop();
} }
+153 -72
View File
@@ -1,8 +1,7 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::{sync::Mutex, task::JoinSet}; use tokio::task::JoinSet;
#[cfg(feature = "quic")] #[cfg(feature = "quic")]
use crate::tunnel::quic::QUICTunnelListener; use crate::tunnel::quic::QUICTunnelListener;
@@ -63,16 +62,20 @@ impl TunnelHandlerForListener for PeerManager {
} }
} }
#[derive(Debug, Clone)] pub trait ListenerCreatorTrait: Fn() -> Box<dyn TunnelListener> + Send + Sync {}
struct Listener { impl<T: Send + Sync> ListenerCreatorTrait for T where T: Fn() -> Box<dyn TunnelListener> + Send {}
inner: Arc<Mutex<dyn TunnelListener>>, pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
#[derive(Clone)]
struct ListenerFactory {
creator_fn: Arc<ListenerCreator>,
must_succ: bool, must_succ: bool,
} }
pub struct ListenerManager<H> { pub struct ListenerManager<H> {
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
net_ns: NetNS, net_ns: NetNS,
listeners: Vec<Listener>, listeners: Vec<ListenerFactory>,
peer_manager: Arc<H>, peer_manager: Arc<H>,
tasks: JoinSet<()>, tasks: JoinSet<()>,
@@ -90,31 +93,39 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
} }
pub async fn prepare_listeners(&mut self) -> Result<(), Error> { pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
let self_id = self.global_ctx.get_id();
self.add_listener( self.add_listener(
RingTunnelListener::new( move || {
format!("ring://{}", self.global_ctx.get_id()) Box::new(RingTunnelListener::new(
.parse() format!("ring://{}", self_id).parse().unwrap(),
.unwrap(), ))
), },
true, true,
) )
.await?; .await?;
for l in self.global_ctx.config.get_listener_uris().iter() { for l in self.global_ctx.config.get_listener_uris().iter() {
let Ok(lis) = get_listener_by_url(l, self.global_ctx.clone()) else { let l = l.clone();
let Ok(_) = get_listener_by_url(&l, self.global_ctx.clone()) else {
let msg = format!("failed to get listener by url: {}, maybe not supported", l); let msg = format!("failed to get listener by url: {}, maybe not supported", l);
self.global_ctx self.global_ctx
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg)); .issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
continue; continue;
}; };
self.add_listener(lis, true).await?; let ctx = self.global_ctx.clone();
self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true)
.await?;
} }
if self.global_ctx.config.get_flags().enable_ipv6 { if self.global_ctx.config.get_flags().enable_ipv6 {
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone(); let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
let _ = self let _ = self
.add_listener( .add_listener(
UdpTunnelListener::new(ipv6_listener.parse().unwrap()), move || {
Box::new(UdpTunnelListener::new(
ipv6_listener.clone().parse().unwrap(),
))
},
false, false,
) )
.await?; .await?;
@@ -123,85 +134,91 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
Ok(()) Ok(())
} }
pub async fn add_listener<L>(&mut self, listener: L, must_succ: bool) -> Result<(), Error> pub async fn add_listener<C: ListenerCreatorTrait + 'static>(
where &mut self,
L: TunnelListener + 'static, creator: C,
{ must_succ: bool,
let listener = Arc::new(Mutex::new(listener)); ) -> Result<(), Error> {
self.listeners.push(Listener { self.listeners.push(ListenerFactory {
inner: listener, creator_fn: Arc::new(Box::new(creator)),
must_succ, must_succ,
}); });
Ok(()) Ok(())
} }
#[tracing::instrument] #[tracing::instrument(skip(creator))]
async fn run_listener( async fn run_listener(
listener: Arc<Mutex<dyn TunnelListener>>, creator: Arc<ListenerCreator>,
peer_manager: Arc<H>, peer_manager: Arc<H>,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
) { ) {
let mut l = listener.lock().await;
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
loop { loop {
let ret = match l.accept().await { let mut l = (creator)();
Ok(ret) => ret, let _g = global_ctx.net_ns.guard();
match l.listen().await {
Ok(_) => {
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
}
Err(e) => { Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed( global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
l.local_url(), l.local_url(),
e.to_string(), format!("error: {:?}, retry listen later...", e),
)); ));
tracing::error!(?e, ?l, "listener accept error"); tracing::error!(?e, ?l, "listener listen error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue; continue;
} }
}; }
loop {
let ret = match l.accept().await {
Ok(ret) => ret,
Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
l.local_url(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener accept error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
};
let tunnel_info = ret.info().unwrap(); let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted( global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info tunnel_info
.local_addr .local_addr
.clone() .clone()
.unwrap_or_default() .unwrap_or_default()
.to_string(), .to_string(),
tunnel_info tunnel_info
.remote_addr .remote_addr
.clone() .clone()
.unwrap_or_default() .unwrap_or_default()
.to_string(), .to_string(),
)); ));
tracing::info!(ret = ?ret, "conn accepted"); tracing::info!(ret = ?ret, "conn accepted");
let peer_manager = peer_manager.clone(); let peer_manager = peer_manager.clone();
let global_ctx = global_ctx.clone(); let global_ctx = global_ctx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let server_ret = peer_manager.handle_tunnel(ret).await; let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret { if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError( global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr.unwrap_or_default().to_string(), tunnel_info.local_addr.unwrap_or_default().to_string(),
tunnel_info.remote_addr.unwrap_or_default().to_string(), tunnel_info.remote_addr.unwrap_or_default().to_string(),
e.to_string(), e.to_string(),
)); ));
tracing::error!(error = ?e, "handle conn error"); tracing::error!(error = ?e, "handle conn error");
} }
}); });
}
} }
} }
pub async fn run(&mut self) -> Result<(), Error> { pub async fn run(&mut self) -> Result<(), Error> {
for listener in &self.listeners { for listener in &self.listeners {
let _guard = self.net_ns.guard();
let addr = listener.inner.lock().await.local_url();
tracing::warn!("run listener: {:?}", listener);
listener
.inner
.lock()
.await
.listen()
.await
.with_context(|| format!("failed to add listener {}", addr))?;
self.tasks.spawn(Self::run_listener( self.tasks.spawn(Self::run_listener(
listener.inner.clone(), listener.creator_fn.clone(),
self.peer_manager.clone(), self.peer_manager.clone(),
self.global_ctx.clone(), self.global_ctx.clone(),
)); ));
@@ -213,12 +230,14 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::atomic::{AtomicI32, Ordering};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::time::timeout; use tokio::time::timeout;
use crate::{ use crate::{
common::global_ctx::tests::get_mock_global_ctx, common::global_ctx::tests::get_mock_global_ctx,
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector}, tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector, TunnelError},
}; };
use super::*; use super::*;
@@ -245,12 +264,18 @@ mod tests {
let ring_id = format!("ring://{}", uuid::Uuid::new_v4()); let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
let ring_id_clone = ring_id.clone();
listener_mgr listener_mgr
.add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true) .add_listener(
move || Box::new(RingTunnelListener::new(ring_id_clone.parse().unwrap())),
true,
)
.await .await
.unwrap(); .unwrap();
listener_mgr.run().await.unwrap(); listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let connect_once = |ring_id| async move { let connect_once = |ring_id| async move {
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap(); let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
let (mut recv, _send) = tunnel.split(); let (mut recv, _send) = tunnel.split();
@@ -269,4 +294,60 @@ mod tests {
.await .await
.unwrap(); .unwrap();
} }
#[tokio::test]
async fn retry_listen() {
let counter = Arc::new(AtomicI32::new(0));
let drop_counter = Arc::new(AtomicI32::new(0));
struct MockListener {
counter: Arc<AtomicI32>,
drop_counter: Arc<AtomicI32>,
}
#[async_trait::async_trait]
impl TunnelListener for MockListener {
fn local_url(&self) -> url::Url {
"mock://".parse().unwrap()
}
async fn listen(&mut self) -> Result<(), TunnelError> {
self.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Err(TunnelError::BufferFull)
}
}
impl Drop for MockListener {
fn drop(&mut self) {
self.drop_counter.fetch_add(1, Ordering::Relaxed);
}
}
let handler = Arc::new(MockListenerHandler {});
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
let counter_clone = counter.clone();
let drop_counter_clone = drop_counter.clone();
listener_mgr
.add_listener(
move || {
Box::new(MockListener {
counter: counter_clone.clone(),
drop_counter: drop_counter_clone.clone(),
})
},
true,
)
.await
.unwrap();
listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert!(counter.load(Ordering::Relaxed) >= 2);
assert!(drop_counter.load(Ordering::Relaxed) >= 1);
}
} }
+2 -8
View File
@@ -484,13 +484,6 @@ mod tests {
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end)); let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
let c = TunnelWithFilter::new(c, c_recorder.clone()); let c = TunnelWithFilter::new(c, c_recorder.clone());
let s = if drop_both {
let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
Box::new(TunnelWithFilter::new(s, s_recorder.clone()))
} else {
s
};
let c_peer_id = new_peer_id(); let c_peer_id = new_peer_id();
let s_peer_id = new_peer_id(); let s_peer_id = new_peer_id();
@@ -506,6 +499,7 @@ mod tests {
s_peer s_peer
.start_recv_loop(tokio::sync::mpsc::channel(200).0) .start_recv_loop(tokio::sync::mpsc::channel(200).0)
.await; .await;
// do not start ping for s, s only reponde to ping from c
assert!(c_ret.is_ok()); assert!(c_ret.is_ok());
assert!(s_ret.is_ok()); assert!(s_ret.is_ok());
@@ -547,7 +541,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn peer_conn_pingpong_bothside_timeout() { async fn peer_conn_pingpong_bothside_timeout() {
peer_conn_pingpong_test_common(4, 12, true, true).await; peer_conn_pingpong_test_common(3, 14, true, true).await;
} }
#[tokio::test] #[tokio::test]
+21 -21
View File
@@ -289,29 +289,29 @@ impl PeerConnPinger {
"pingpong task recv pingpong_once result" "pingpong task recv pingpong_once result"
); );
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) { let current_rx_packets = throughput.rx_packets();
let current_rx_packets = throughput.rx_packets(); if last_rx_packets != current_rx_packets {
let need_close = if last_rx_packets != current_rx_packets { // if we receive some packet from peers, reset the counter to avoid conn close.
// if we receive some packet from peers, we should relax the condition // conn will close only if we have 5 continous round pingpong loss after no packet received.
counter > 50 && loss_rate_1 > 0.5 counter = 0;
}
// TODO: wait more time to see if the loss rate is still high after no rx tracing::debug!(
} else { "counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
true counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id
}; );
if need_close { if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) {
tracing::warn!( tracing::warn!(
?ret, ?ret,
?self, ?self,
?loss_rate_1, ?loss_rate_1,
?loss_rate_20, ?loss_rate_20,
?last_rx_packets, ?last_rx_packets,
?current_rx_packets, ?current_rx_packets,
"pingpong loss rate too high, closing" "pingpong loss rate too high, closing"
); );
break; break;
}
} }
last_rx_packets = throughput.rx_packets(); last_rx_packets = throughput.rx_packets();
-1
View File
@@ -1739,7 +1739,6 @@ impl RouteSessionManager {
continue; continue;
} }
let _ = self.stop_session(*peer_id); let _ = self.stop_session(*peer_id);
assert_ne!(Some(*peer_id), cur_dst_peer_id_to_initiate);
} }
} }
+3 -1
View File
@@ -24,8 +24,10 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
ret.listeners = self ret.listeners = self
.global_ctx .global_ctx
.get_running_listeners() .config
.get_mapped_listeners()
.into_iter() .into_iter()
.chain(self.global_ctx.get_running_listeners().into_iter())
.map(Into::into) .map(Into::into)
.collect(); .collect();
Ok(ret) Ok(ret)
+1
View File
@@ -21,6 +21,7 @@ message FlagsInConfig {
string ipv6_listener = 14; string ipv6_listener = 14;
bool multi_thread = 15; bool multi_thread = 15;
CompressionAlgoPb data_compress_algo = 16; CompressionAlgoPb data_compress_algo = 16;
bool bind_device = 17;
} }
message RpcDescriptor { message RpcDescriptor {
+4 -1
View File
@@ -159,7 +159,10 @@ pub fn build_rpc_packet(
let cur_packet = RpcPacket { let cur_packet = RpcPacket {
from_peer, from_peer,
to_peer, to_peer,
descriptor: if cur_offset == 0 { descriptor: if cur_offset == 0
|| compression_info.algo == CompressionAlgoPb::None as i32
{
// old version must have descriptor on every piece
Some(rpc_desc.clone()) Some(rpc_desc.clone())
} else { } else {
None None
+40 -20
View File
@@ -28,6 +28,30 @@ impl TcpTunnelListener {
listener: None, listener: None,
} }
} }
async fn do_accept(&mut self) -> Result<Box<dyn Tunnel>, std::io::Error> {
let listener = self.listener.as_ref().unwrap();
let (stream, _) = listener.accept().await?;
if let Err(e) = stream.set_nodelay(true) {
tracing::warn!(?e, "set_nodelay fail in accept");
}
let info = TunnelInfo {
tunnel_type: "tcp".to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
Ok(Box::new(TunnelWrapper::new(
FramedReader::new(r, TCP_MTU_BYTES),
FramedWriter::new(w),
Some(info),
)))
}
} }
#[async_trait] #[async_trait]
@@ -57,27 +81,23 @@ impl TunnelListener for TcpTunnelListener {
} }
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> { async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let listener = self.listener.as_ref().unwrap(); loop {
let (stream, _) = listener.accept().await?; match self.do_accept().await {
Ok(ret) => return Ok(ret),
if let Err(e) = stream.set_nodelay(true) { Err(e) => {
tracing::warn!(?e, "set_nodelay fail in accept"); use std::io::ErrorKind::*;
if matches!(
e.kind(),
NotConnected | ConnectionAborted | ConnectionRefused | ConnectionReset
) {
tracing::warn!(?e, "accept fail with retryable error: {:?}", e);
continue;
}
tracing::warn!(?e, "accept fail");
return Err(e.into());
}
}
} }
let info = TunnelInfo {
tunnel_type: "tcp".to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
Ok(Box::new(TunnelWrapper::new(
FramedReader::new(r, TCP_MTU_BYTES),
FramedWriter::new(w),
Some(info),
)))
} }
fn local_url(&self) -> url::Url { fn local_url(&self) -> url::Url {