mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-16 10:55:37 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c23b544c34 | |||
| 9d76b86f49 | |||
| bb0ccca3e5 | |||
| 306817ae9a | |||
| d2ec60e108 | |||
| e016aeddeb | |||
| a4419a31fd |
@@ -21,7 +21,7 @@ on:
|
|||||||
version:
|
version:
|
||||||
description: 'Version for this release'
|
description: 'Version for this release'
|
||||||
type: string
|
type: string
|
||||||
default: 'v2.1.1'
|
default: 'v2.1.2'
|
||||||
required: true
|
required: true
|
||||||
make_latest:
|
make_latest:
|
||||||
description: 'Mark this release as latest'
|
description: 'Mark this release as latest'
|
||||||
|
|||||||
Generated
+2
-2
@@ -1830,7 +1830,7 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "easytier"
|
name = "easytier"
|
||||||
version = "2.1.1"
|
version = "2.1.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes-gcm",
|
"aes-gcm",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@@ -1926,7 +1926,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "easytier-gui"
|
name = "easytier-gui"
|
||||||
version = "2.1.1"
|
version = "2.1.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
|
|||||||
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
|
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
|
||||||
- **IPv6 Support**: Supports networking using IPv6.
|
- **IPv6 Support**: Supports networking using IPv6.
|
||||||
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
|
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
|
||||||
|
- **Web Management Interface**: Provides a [web-based management](https://easytier.cn/web) interface for easy configuration and monitoring.
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -52,7 +53,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
|
|||||||
|
|
||||||
4. **Install by Docker Compose**
|
4. **Install by Docker Compose**
|
||||||
|
|
||||||
Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.
|
Please visit the [EasyTier Official Website](https://www.easytier.cn/en/) to view the full documentation.
|
||||||
|
|
||||||
5. **Install by script (For Linux Only)**
|
5. **Install by script (For Linux Only)**
|
||||||
|
|
||||||
@@ -200,20 +201,20 @@ Subnet proxy information will automatically sync to each node in the virtual net
|
|||||||
|
|
||||||
### Networking without Public IP
|
### Networking without Public IP
|
||||||
|
|
||||||
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.top:11010``.
|
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.cn:11010``.
|
||||||
|
|
||||||
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
|
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
|
||||||
|
|
||||||
Taking two nodes as an example, Node A executes:
|
Taking two nodes as an example, Node A executes:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
Node B executes
|
Node B executes
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
|
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
|
||||||
@@ -286,7 +287,7 @@ Run you own public server cluster is exactly same as running an virtual network,
|
|||||||
You can also join the official public server cluster with following command:
|
You can also join the official public server cluster with following command:
|
||||||
|
|
||||||
```
|
```
|
||||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
|
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
@@ -296,10 +297,8 @@ You can use ``easytier-core --help`` to view all configuration items
|
|||||||
|
|
||||||
## Roadmap
|
## Roadmap
|
||||||
|
|
||||||
- [ ] Improve documentation and user guides.
|
- [ ] Support features such TCP hole punching, KCP, FEC etc.
|
||||||
- [ ] Support features such as encryption, TCP hole punching, etc.
|
|
||||||
- [ ] Support iOS.
|
- [ ] Support iOS.
|
||||||
- [ ] Support Web configuration management.
|
|
||||||
|
|
||||||
## Community and Contribution
|
## Community and Contribution
|
||||||
|
|
||||||
|
|||||||
+8
-8
@@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
[简体中文](/README_CN.md) | [English](/README.md)
|
[简体中文](/README_CN.md) | [English](/README.md)
|
||||||
|
|
||||||
**请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。**
|
**请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。**
|
||||||
|
|
||||||
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
|
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
|
||||||
|
|
||||||
@@ -31,6 +31,7 @@
|
|||||||
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
|
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
|
||||||
- **IPV6 支持**:支持利用 IPV6 组网。
|
- **IPV6 支持**:支持利用 IPV6 组网。
|
||||||
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
|
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
|
||||||
|
- **Web 管理界面**:支持通过 [Web 界面](https://easytier.cn)管理节点。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
|
|
||||||
@@ -52,7 +53,7 @@
|
|||||||
|
|
||||||
4. **通过Docker Compose安装**
|
4. **通过Docker Compose安装**
|
||||||
|
|
||||||
请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。
|
请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。
|
||||||
|
|
||||||
5. **使用一键脚本安装 (仅适用于 Linux)**
|
5. **使用一键脚本安装 (仅适用于 Linux)**
|
||||||
|
|
||||||
@@ -199,20 +200,20 @@ sudo easytier-core --ipv4 10.144.144.2 -n 10.1.1.0/24
|
|||||||
|
|
||||||
### 无公网IP组网
|
### 无公网IP组网
|
||||||
|
|
||||||
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.top:11010``。
|
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.cn:11010``。
|
||||||
|
|
||||||
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
|
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
|
||||||
|
|
||||||
以双节点为例,节点 A 执行:
|
以双节点为例,节点 A 执行:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
节点 B 执行
|
节点 B 执行
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
|
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
|
||||||
@@ -289,7 +290,7 @@ connected_clients:
|
|||||||
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
|
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
|
||||||
|
|
||||||
```
|
```
|
||||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
|
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
|
||||||
```
|
```
|
||||||
|
|
||||||
### 其他配置
|
### 其他配置
|
||||||
@@ -299,9 +300,8 @@ sudo easytier-core --network-name easytier --network-secret easytier -p tcp://pu
|
|||||||
## 路线图
|
## 路线图
|
||||||
|
|
||||||
- [ ] 完善文档和用户指南。
|
- [ ] 完善文档和用户指南。
|
||||||
- [ ] 支持 TCP 打洞等特性。
|
- [ ] 支持 TCP 打洞、KCP、FEC 等特性。
|
||||||
- [ ] 支持 iOS。
|
- [ ] 支持 iOS。
|
||||||
- [ ] 支持 Web 配置管理。
|
|
||||||
|
|
||||||
## 社区和贡献
|
## 社区和贡献
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "easytier-gui",
|
"name": "easytier-gui",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"version": "2.1.1",
|
"version": "2.1.2",
|
||||||
"private": true,
|
"private": true,
|
||||||
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
|
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "easytier-gui"
|
name = "easytier-gui"
|
||||||
version = "2.1.1"
|
version = "2.1.2"
|
||||||
description = "EasyTier GUI"
|
description = "EasyTier GUI"
|
||||||
authors = ["you"]
|
authors = ["you"]
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
"createUpdaterArtifacts": false
|
"createUpdaterArtifacts": false
|
||||||
},
|
},
|
||||||
"productName": "easytier-gui",
|
"productName": "easytier-gui",
|
||||||
"version": "2.1.1",
|
"version": "2.1.2",
|
||||||
"identifier": "com.kkrainbow.easytier",
|
"identifier": "com.kkrainbow.easytier",
|
||||||
"plugins": {},
|
"plugins": {},
|
||||||
"app": {
|
"app": {
|
||||||
|
|||||||
+1
-1
@@ -3,7 +3,7 @@ name = "easytier"
|
|||||||
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
|
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
|
||||||
homepage = "https://github.com/EasyTier/EasyTier"
|
homepage = "https://github.com/EasyTier/EasyTier"
|
||||||
repository = "https://github.com/EasyTier/EasyTier"
|
repository = "https://github.com/EasyTier/EasyTier"
|
||||||
version = "2.1.1"
|
version = "2.1.2"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
authors = ["kkrainbow"]
|
authors = ["kkrainbow"]
|
||||||
keywords = ["vpn", "p2p", "network", "easytier"]
|
keywords = ["vpn", "p2p", "network", "easytier"]
|
||||||
|
|||||||
@@ -134,6 +134,12 @@ core_clap:
|
|||||||
compression:
|
compression:
|
||||||
en: "compression algorithm to use, support none, zstd. default is none"
|
en: "compression algorithm to use, support none, zstd. default is none"
|
||||||
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
|
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
|
||||||
|
mapped_listeners:
|
||||||
|
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
|
||||||
|
zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。"
|
||||||
|
bind_device:
|
||||||
|
en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally."
|
||||||
|
zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。"
|
||||||
|
|
||||||
core_app:
|
core_app:
|
||||||
panic_backtrace_save:
|
panic_backtrace_save:
|
||||||
|
|||||||
@@ -27,8 +27,9 @@ pub fn gen_default_flags() -> Flags {
|
|||||||
relay_all_peer_rpc: false,
|
relay_all_peer_rpc: false,
|
||||||
disable_udp_hole_punching: false,
|
disable_udp_hole_punching: false,
|
||||||
ipv6_listener: "udp://[::]:0".to_string(),
|
ipv6_listener: "udp://[::]:0".to_string(),
|
||||||
multi_thread: false,
|
multi_thread: true,
|
||||||
data_compress_algo: CompressionAlgoPb::None.into(),
|
data_compress_algo: CompressionAlgoPb::None.into(),
|
||||||
|
bind_device: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,6 +73,9 @@ pub trait ConfigLoader: Send + Sync {
|
|||||||
fn get_listeners(&self) -> Vec<url::Url>;
|
fn get_listeners(&self) -> Vec<url::Url>;
|
||||||
fn set_listeners(&self, listeners: Vec<url::Url>);
|
fn set_listeners(&self, listeners: Vec<url::Url>);
|
||||||
|
|
||||||
|
fn get_mapped_listeners(&self) -> Vec<url::Url>;
|
||||||
|
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
|
||||||
|
|
||||||
fn get_rpc_portal(&self) -> Option<SocketAddr>;
|
fn get_rpc_portal(&self) -> Option<SocketAddr>;
|
||||||
fn set_rpc_portal(&self, addr: SocketAddr);
|
fn set_rpc_portal(&self, addr: SocketAddr);
|
||||||
|
|
||||||
@@ -183,6 +187,7 @@ struct Config {
|
|||||||
dhcp: Option<bool>,
|
dhcp: Option<bool>,
|
||||||
network_identity: Option<NetworkIdentity>,
|
network_identity: Option<NetworkIdentity>,
|
||||||
listeners: Option<Vec<url::Url>>,
|
listeners: Option<Vec<url::Url>>,
|
||||||
|
mapped_listeners: Option<Vec<url::Url>>,
|
||||||
exit_nodes: Option<Vec<Ipv4Addr>>,
|
exit_nodes: Option<Vec<Ipv4Addr>>,
|
||||||
|
|
||||||
peer: Option<Vec<PeerConfig>>,
|
peer: Option<Vec<PeerConfig>>,
|
||||||
@@ -472,6 +477,19 @@ impl ConfigLoader for TomlConfigLoader {
|
|||||||
self.config.lock().unwrap().listeners = Some(listeners);
|
self.config.lock().unwrap().listeners = Some(listeners);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_mapped_listeners(&self) -> Vec<url::Url> {
|
||||||
|
self.config
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.mapped_listeners
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
|
||||||
|
self.config.lock().unwrap().mapped_listeners = listeners;
|
||||||
|
}
|
||||||
|
|
||||||
fn get_rpc_portal(&self) -> Option<SocketAddr> {
|
fn get_rpc_portal(&self) -> Option<SocketAddr> {
|
||||||
self.config.lock().unwrap().rpc_portal
|
self.config.lock().unwrap().rpc_portal
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -230,7 +230,10 @@ impl GlobalCtx {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_running_listener(&self, url: url::Url) {
|
pub fn add_running_listener(&self, url: url::Url) {
|
||||||
self.running_listeners.lock().unwrap().push(url);
|
let mut l = self.running_listeners.lock().unwrap();
|
||||||
|
if !l.contains(&url) {
|
||||||
|
l.push(url);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {
|
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
future,
|
future,
|
||||||
|
io::Write as _,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
@@ -81,7 +82,17 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_machine_id() -> uuid::Uuid {
|
pub fn get_machine_id() -> uuid::Uuid {
|
||||||
// TODO: load from local file
|
// a path same as the binary
|
||||||
|
let machine_id_file = std::env::current_exe()
|
||||||
|
.map(|x| x.with_file_name("et_machine_id"))
|
||||||
|
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
|
||||||
|
|
||||||
|
// try load from local file
|
||||||
|
if let Ok(mid) = std::fs::read_to_string(&machine_id_file) {
|
||||||
|
if let Ok(mid) = uuid::Uuid::parse_str(mid.trim()) {
|
||||||
|
return mid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
target_os = "linux",
|
target_os = "linux",
|
||||||
@@ -95,7 +106,7 @@ pub fn get_machine_id() -> uuid::Uuid {
|
|||||||
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
|
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
|
||||||
uuid::Uuid::from_bytes(b)
|
uuid::Uuid::from_bytes(b)
|
||||||
})
|
})
|
||||||
.unwrap_or(uuid::Uuid::new_v4());
|
.ok();
|
||||||
|
|
||||||
#[cfg(not(any(
|
#[cfg(not(any(
|
||||||
target_os = "linux",
|
target_os = "linux",
|
||||||
@@ -103,9 +114,18 @@ pub fn get_machine_id() -> uuid::Uuid {
|
|||||||
target_os = "windows",
|
target_os = "windows",
|
||||||
target_os = "freebsd"
|
target_os = "freebsd"
|
||||||
)))]
|
)))]
|
||||||
|
let gen_mid = None;
|
||||||
|
|
||||||
|
if gen_mid.is_some() {
|
||||||
|
return gen_mid.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
let gen_mid = uuid::Uuid::new_v4();
|
let gen_mid = uuid::Uuid::new_v4();
|
||||||
|
|
||||||
// TODO: save to local file
|
// try save to local file
|
||||||
|
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
|
||||||
|
let _ = file.write_all(gen_mid.to_string().as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
gen_mid
|
gen_mid
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,13 @@
|
|||||||
// try connect peers directly, with either its public ip or lan ip
|
// try connect peers directly, with either its public ip or lan ip
|
||||||
|
|
||||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
use std::{
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
|
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
|
||||||
@@ -29,6 +36,8 @@ use super::create_connector_by_url;
|
|||||||
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
|
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
|
||||||
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
|
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
|
||||||
|
|
||||||
|
static TESTING: AtomicBool = AtomicBool::new(false);
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait PeerManagerForDirectConnector {
|
pub trait PeerManagerForDirectConnector {
|
||||||
async fn list_peers(&self) -> Vec<PeerId>;
|
async fn list_peers(&self) -> Vec<PeerId>;
|
||||||
@@ -182,7 +191,7 @@ impl DirectConnectorManager {
|
|||||||
|
|
||||||
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
|
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
|
||||||
|
|
||||||
if peer_id != dst_peer_id {
|
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
|
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
|
||||||
addr,
|
addr,
|
||||||
@@ -279,87 +288,103 @@ impl DirectConnectorManager {
|
|||||||
|
|
||||||
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
|
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
|
||||||
match listener_host {
|
match listener_host {
|
||||||
Some(SocketAddr::V4(_)) => {
|
Some(SocketAddr::V4(s_addr)) => {
|
||||||
ip_list.interface_ipv4s.iter().for_each(|ip| {
|
if s_addr.ip().is_unspecified() {
|
||||||
let mut addr = (*listener).clone();
|
ip_list.interface_ipv4s.iter().for_each(|ip| {
|
||||||
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
|
let mut addr = (*listener).clone();
|
||||||
tasks.spawn(Self::try_connect_to_ip(
|
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
|
||||||
data.clone(),
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
dst_peer_id.clone(),
|
data.clone(),
|
||||||
addr.to_string(),
|
dst_peer_id.clone(),
|
||||||
));
|
addr.to_string(),
|
||||||
} else {
|
));
|
||||||
tracing::error!(
|
} else {
|
||||||
?ip,
|
tracing::error!(
|
||||||
?listener,
|
?ip,
|
||||||
?dst_peer_id,
|
?listener,
|
||||||
"failed to set host for interface ipv4"
|
?dst_peer_id,
|
||||||
);
|
"failed to set host for interface ipv4"
|
||||||
}
|
);
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if let Some(public_ipv4) = ip_list.public_ipv4 {
|
if let Some(public_ipv4) = ip_list.public_ipv4 {
|
||||||
let mut addr = (*listener).clone();
|
let mut addr = (*listener).clone();
|
||||||
if addr
|
if addr
|
||||||
.set_host(Some(public_ipv4.to_string().as_str()))
|
.set_host(Some(public_ipv4.to_string().as_str()))
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
tasks.spawn(Self::try_connect_to_ip(
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
data.clone(),
|
data.clone(),
|
||||||
dst_peer_id.clone(),
|
dst_peer_id.clone(),
|
||||||
addr.to_string(),
|
addr.to_string(),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
?public_ipv4,
|
?public_ipv4,
|
||||||
?listener,
|
?listener,
|
||||||
?dst_peer_id,
|
?dst_peer_id,
|
||||||
"failed to set host for public ipv4"
|
"failed to set host for public ipv4"
|
||||||
);
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
|
||||||
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
|
data.clone(),
|
||||||
|
dst_peer_id.clone(),
|
||||||
|
listener.to_string(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(SocketAddr::V6(_)) => {
|
Some(SocketAddr::V6(s_addr)) => {
|
||||||
ip_list.interface_ipv6s.iter().for_each(|ip| {
|
if s_addr.ip().is_unspecified() {
|
||||||
let mut addr = (*listener).clone();
|
ip_list.interface_ipv6s.iter().for_each(|ip| {
|
||||||
if addr
|
let mut addr = (*listener).clone();
|
||||||
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
|
if addr
|
||||||
.is_ok()
|
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
|
||||||
{
|
.is_ok()
|
||||||
tasks.spawn(Self::try_connect_to_ip(
|
{
|
||||||
data.clone(),
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
dst_peer_id.clone(),
|
data.clone(),
|
||||||
addr.to_string(),
|
dst_peer_id.clone(),
|
||||||
));
|
addr.to_string(),
|
||||||
} else {
|
));
|
||||||
tracing::error!(
|
} else {
|
||||||
?ip,
|
tracing::error!(
|
||||||
?listener,
|
?ip,
|
||||||
?dst_peer_id,
|
?listener,
|
||||||
"failed to set host for interface ipv6"
|
?dst_peer_id,
|
||||||
);
|
"failed to set host for interface ipv6"
|
||||||
}
|
);
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if let Some(public_ipv6) = ip_list.public_ipv6 {
|
if let Some(public_ipv6) = ip_list.public_ipv6 {
|
||||||
let mut addr = (*listener).clone();
|
let mut addr = (*listener).clone();
|
||||||
if addr
|
if addr
|
||||||
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
|
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
tasks.spawn(Self::try_connect_to_ip(
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
data.clone(),
|
data.clone(),
|
||||||
dst_peer_id.clone(),
|
dst_peer_id.clone(),
|
||||||
addr.to_string(),
|
addr.to_string(),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
?public_ipv6,
|
?public_ipv6,
|
||||||
?listener,
|
?listener,
|
||||||
?dst_peer_id,
|
?dst_peer_id,
|
||||||
"failed to set host for public ipv6"
|
"failed to set host for public ipv6"
|
||||||
);
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
|
||||||
|
tasks.spawn(Self::try_connect_to_ip(
|
||||||
|
data.clone(),
|
||||||
|
dst_peer_id.clone(),
|
||||||
|
listener.to_string(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p => {
|
p => {
|
||||||
@@ -452,6 +477,49 @@ mod tests {
|
|||||||
proto::peer_rpc::GetIpListResponse,
|
proto::peer_rpc::GetIpListResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use super::TESTING;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn direct_connector_mapped_listener() {
|
||||||
|
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
let p_a = create_mock_peer_manager().await;
|
||||||
|
let p_b = create_mock_peer_manager().await;
|
||||||
|
let p_c = create_mock_peer_manager().await;
|
||||||
|
let p_x = create_mock_peer_manager().await;
|
||||||
|
connect_peer_manager(p_a.clone(), p_b.clone()).await;
|
||||||
|
connect_peer_manager(p_b.clone(), p_c.clone()).await;
|
||||||
|
connect_peer_manager(p_c.clone(), p_x.clone()).await;
|
||||||
|
|
||||||
|
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
|
||||||
|
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
|
||||||
|
|
||||||
|
let mut f = p_a.get_global_ctx().get_flags();
|
||||||
|
f.bind_device = false;
|
||||||
|
p_a.get_global_ctx().config.set_flags(f);
|
||||||
|
|
||||||
|
p_c.get_global_ctx()
|
||||||
|
.config
|
||||||
|
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
|
||||||
|
|
||||||
|
p_x.get_global_ctx()
|
||||||
|
.config
|
||||||
|
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
|
||||||
|
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
|
||||||
|
lis_x.prepare_listeners().await.unwrap();
|
||||||
|
lis_x.run().await.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
|
||||||
|
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
|
||||||
|
dm_a.run_as_client();
|
||||||
|
dm_c.run_as_server();
|
||||||
|
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
|
||||||
|
|
||||||
|
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[rstest::rstest]
|
#[rstest::rstest]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn direct_connector_basic_test(
|
async fn direct_connector_basic_test(
|
||||||
|
|||||||
@@ -297,12 +297,14 @@ impl ManualConnectorManager {
|
|||||||
|
|
||||||
connector.lock().await.set_ip_version(ip_version);
|
connector.lock().await.set_ip_version(ip_version);
|
||||||
|
|
||||||
set_bind_addr_for_peer_connector(
|
if data.global_ctx.config.get_flags().bind_device {
|
||||||
connector.lock().await.as_mut(),
|
set_bind_addr_for_peer_connector(
|
||||||
ip_version == IpVersion::V4,
|
connector.lock().await.as_mut(),
|
||||||
&ip_collector,
|
ip_version == IpVersion::V4,
|
||||||
)
|
&ip_collector,
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
|
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
|
||||||
connector.lock().await.remote_url().clone(),
|
connector.lock().await.remote_url().clone(),
|
||||||
|
|||||||
@@ -56,23 +56,27 @@ pub async fn create_connector_by_url(
|
|||||||
"tcp" => {
|
"tcp" => {
|
||||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
|
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
|
||||||
let mut connector = TcpTunnelConnector::new(url);
|
let mut connector = TcpTunnelConnector::new(url);
|
||||||
set_bind_addr_for_peer_connector(
|
if global_ctx.config.get_flags().bind_device {
|
||||||
&mut connector,
|
set_bind_addr_for_peer_connector(
|
||||||
dst_addr.is_ipv4(),
|
&mut connector,
|
||||||
&global_ctx.get_ip_collector(),
|
dst_addr.is_ipv4(),
|
||||||
)
|
&global_ctx.get_ip_collector(),
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
return Ok(Box::new(connector));
|
return Ok(Box::new(connector));
|
||||||
}
|
}
|
||||||
"udp" => {
|
"udp" => {
|
||||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
|
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
|
||||||
let mut connector = UdpTunnelConnector::new(url);
|
let mut connector = UdpTunnelConnector::new(url);
|
||||||
set_bind_addr_for_peer_connector(
|
if global_ctx.config.get_flags().bind_device {
|
||||||
&mut connector,
|
set_bind_addr_for_peer_connector(
|
||||||
dst_addr.is_ipv4(),
|
&mut connector,
|
||||||
&global_ctx.get_ip_collector(),
|
dst_addr.is_ipv4(),
|
||||||
)
|
&global_ctx.get_ip_collector(),
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
return Ok(Box::new(connector));
|
return Ok(Box::new(connector));
|
||||||
}
|
}
|
||||||
"ring" => {
|
"ring" => {
|
||||||
@@ -84,12 +88,14 @@ pub async fn create_connector_by_url(
|
|||||||
"quic" => {
|
"quic" => {
|
||||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
|
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
|
||||||
let mut connector = QUICTunnelConnector::new(url);
|
let mut connector = QUICTunnelConnector::new(url);
|
||||||
set_bind_addr_for_peer_connector(
|
if global_ctx.config.get_flags().bind_device {
|
||||||
&mut connector,
|
set_bind_addr_for_peer_connector(
|
||||||
dst_addr.is_ipv4(),
|
&mut connector,
|
||||||
&global_ctx.get_ip_collector(),
|
dst_addr.is_ipv4(),
|
||||||
)
|
&global_ctx.get_ip_collector(),
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
return Ok(Box::new(connector));
|
return Ok(Box::new(connector));
|
||||||
}
|
}
|
||||||
#[cfg(feature = "wireguard")]
|
#[cfg(feature = "wireguard")]
|
||||||
@@ -101,12 +107,14 @@ pub async fn create_connector_by_url(
|
|||||||
&nid.network_secret.unwrap_or_default(),
|
&nid.network_secret.unwrap_or_default(),
|
||||||
);
|
);
|
||||||
let mut connector = WgTunnelConnector::new(url, wg_config);
|
let mut connector = WgTunnelConnector::new(url, wg_config);
|
||||||
set_bind_addr_for_peer_connector(
|
if global_ctx.config.get_flags().bind_device {
|
||||||
&mut connector,
|
set_bind_addr_for_peer_connector(
|
||||||
dst_addr.is_ipv4(),
|
&mut connector,
|
||||||
&global_ctx.get_ip_collector(),
|
dst_addr.is_ipv4(),
|
||||||
)
|
&global_ctx.get_ip_collector(),
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
return Ok(Box::new(connector));
|
return Ok(Box::new(connector));
|
||||||
}
|
}
|
||||||
#[cfg(feature = "websocket")]
|
#[cfg(feature = "websocket")]
|
||||||
@@ -114,12 +122,14 @@ pub async fn create_connector_by_url(
|
|||||||
use crate::tunnel::{FromUrl, IpVersion};
|
use crate::tunnel::{FromUrl, IpVersion};
|
||||||
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
|
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
|
||||||
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
|
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
|
||||||
set_bind_addr_for_peer_connector(
|
if global_ctx.config.get_flags().bind_device {
|
||||||
&mut connector,
|
set_bind_addr_for_peer_connector(
|
||||||
dst_addr.is_ipv4(),
|
&mut connector,
|
||||||
&global_ctx.get_ip_collector(),
|
dst_addr.is_ipv4(),
|
||||||
)
|
&global_ctx.get_ip_collector(),
|
||||||
.await;
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
return Ok(Box::new(connector));
|
return Ok(Box::new(connector));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
|||||||
@@ -123,6 +123,13 @@ struct Cli {
|
|||||||
)]
|
)]
|
||||||
listeners: Vec<String>,
|
listeners: Vec<String>,
|
||||||
|
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
help = t!("core_clap.mapped_listeners").to_string(),
|
||||||
|
num_args = 0..
|
||||||
|
)]
|
||||||
|
mapped_listeners: Vec<String>,
|
||||||
|
|
||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
help = t!("core_clap.no_listener").to_string(),
|
help = t!("core_clap.no_listener").to_string(),
|
||||||
@@ -185,7 +192,7 @@ struct Cli {
|
|||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
help = t!("core_clap.multi_thread").to_string(),
|
help = t!("core_clap.multi_thread").to_string(),
|
||||||
default_value = "false"
|
default_value = "true"
|
||||||
)]
|
)]
|
||||||
multi_thread: bool,
|
multi_thread: bool,
|
||||||
|
|
||||||
@@ -300,6 +307,12 @@ struct Cli {
|
|||||||
default_value = "none",
|
default_value = "none",
|
||||||
)]
|
)]
|
||||||
compression: String,
|
compression: String,
|
||||||
|
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
help = t!("core_clap.bind_device").to_string()
|
||||||
|
)]
|
||||||
|
bind_device: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
rust_i18n::i18n!("locales", fallback = "en");
|
rust_i18n::i18n!("locales", fallback = "en");
|
||||||
@@ -422,6 +435,23 @@ impl TryFrom<&Cli> for TomlConfigLoader {
|
|||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
cfg.set_mapped_listeners(Some(
|
||||||
|
cli.mapped_listeners
|
||||||
|
.iter()
|
||||||
|
.map(|s| {
|
||||||
|
s.parse()
|
||||||
|
.with_context(|| format!("mapped listener is not a valid url: {}", s))
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.map(|s: url::Url| {
|
||||||
|
if s.port().is_none() {
|
||||||
|
panic!("mapped listener port is missing: {}", s);
|
||||||
|
}
|
||||||
|
s
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
));
|
||||||
|
|
||||||
for n in cli.proxy_networks.iter() {
|
for n in cli.proxy_networks.iter() {
|
||||||
cfg.add_proxy_cidr(
|
cfg.add_proxy_cidr(
|
||||||
n.parse()
|
n.parse()
|
||||||
@@ -534,6 +564,9 @@ impl TryFrom<&Cli> for TomlConfigLoader {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
.into();
|
.into();
|
||||||
|
if let Some(bind_device) = cli.bind_device {
|
||||||
|
f.bind_device = bind_device;
|
||||||
|
}
|
||||||
cfg.set_flags(f);
|
cfg.set_flags(f);
|
||||||
|
|
||||||
cfg.set_exit_nodes(cli.exit_nodes.clone());
|
cfg.set_exit_nodes(cli.exit_nodes.clone());
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
use cidr::Ipv4Inet;
|
use cidr::Ipv4Inet;
|
||||||
use crossbeam::atomic::AtomicCell;
|
use crossbeam::atomic::AtomicCell;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
@@ -24,11 +25,11 @@ use tokio::{
|
|||||||
use tracing::Level;
|
use tracing::Level;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
|
common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
|
||||||
gateway::ip_reassembler::compose_ipv4_packet,
|
gateway::ip_reassembler::compose_ipv4_packet,
|
||||||
peers::{peer_manager::PeerManager, PeerPacketFilter},
|
peers::{peer_manager::PeerManager, PeerPacketFilter},
|
||||||
tunnel::{
|
tunnel::{
|
||||||
common::setup_sokcet2,
|
common::{reserve_buf, setup_sokcet2},
|
||||||
packet_def::{PacketType, ZCPacket},
|
packet_def::{PacketType, ZCPacket},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -139,59 +140,81 @@ impl UdpNatEntry {
|
|||||||
mut packet_sender: Sender<ZCPacket>,
|
mut packet_sender: Sender<ZCPacket>,
|
||||||
virtual_ipv4: Ipv4Addr,
|
virtual_ipv4: Ipv4Addr,
|
||||||
) {
|
) {
|
||||||
let mut buf = [0u8; 65536];
|
let (s, mut r) = tachyonix::channel(128);
|
||||||
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
|
|
||||||
let mut ip_id = 1;
|
|
||||||
|
|
||||||
loop {
|
let self_clone = self.clone();
|
||||||
let (len, src_socket) = match timeout(
|
let recv_task = ScopedTask::from(tokio::spawn(async move {
|
||||||
Duration::from_secs(120),
|
let mut cur_buf = BytesMut::new();
|
||||||
self.socket.recv_from(&mut udp_body),
|
loop {
|
||||||
)
|
if self_clone
|
||||||
.await
|
.stopped
|
||||||
{
|
.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
Ok(Ok(x)) => x,
|
{
|
||||||
Ok(Err(err)) => {
|
|
||||||
tracing::error!(?err, "udp nat recv failed");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
|
||||||
tracing::error!(?err, "udp nat recv timeout");
|
reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
|
||||||
break;
|
assert_eq!(cur_buf.len(), 0);
|
||||||
|
unsafe {
|
||||||
|
cur_buf.advance_mut(28);
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
|
let (len, src_socket) = match timeout(
|
||||||
|
Duration::from_secs(120),
|
||||||
|
self_clone.socket.recv_buf_from(&mut cur_buf),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(x)) => x,
|
||||||
|
Ok(Err(err)) => {
|
||||||
|
tracing::error!(?err, "udp nat recv failed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(?err, "udp nat recv timeout");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
|
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
|
||||||
break;
|
|
||||||
|
let ret_buf = cur_buf.split();
|
||||||
|
s.send((ret_buf, len, src_socket)).await.unwrap();
|
||||||
}
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
let SocketAddr::V4(mut src_v4) = src_socket else {
|
let self_clone = self.clone();
|
||||||
continue;
|
let send_task = ScopedTask::from(tokio::spawn(async move {
|
||||||
};
|
let mut ip_id = 1;
|
||||||
|
while let Ok((mut packet, len, src_socket)) = r.recv().await {
|
||||||
|
let SocketAddr::V4(mut src_v4) = src_socket else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
self.mark_active();
|
self_clone.mark_active();
|
||||||
|
|
||||||
if src_v4.ip().is_loopback() {
|
if src_v4.ip().is_loopback() {
|
||||||
src_v4.set_ip(virtual_ipv4);
|
src_v4.set_ip(virtual_ipv4);
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(_) = Self::compose_ipv4_packet(
|
||||||
|
&self_clone,
|
||||||
|
&mut packet_sender,
|
||||||
|
&mut packet,
|
||||||
|
&src_v4,
|
||||||
|
len,
|
||||||
|
1280,
|
||||||
|
ip_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
ip_id = ip_id.wrapping_add(1);
|
||||||
}
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
let Ok(_) = Self::compose_ipv4_packet(
|
let _ = tokio::join!(recv_task, send_task);
|
||||||
&self,
|
|
||||||
&mut packet_sender,
|
|
||||||
&mut buf,
|
|
||||||
&src_v4,
|
|
||||||
len,
|
|
||||||
1200,
|
|
||||||
ip_id,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
else {
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
ip_id = ip_id.wrapping_add(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.stop();
|
self.stop();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use tokio::{sync::Mutex, task::JoinSet};
|
use tokio::task::JoinSet;
|
||||||
|
|
||||||
#[cfg(feature = "quic")]
|
#[cfg(feature = "quic")]
|
||||||
use crate::tunnel::quic::QUICTunnelListener;
|
use crate::tunnel::quic::QUICTunnelListener;
|
||||||
@@ -63,16 +62,20 @@ impl TunnelHandlerForListener for PeerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
pub trait ListenerCreatorTrait: Fn() -> Box<dyn TunnelListener> + Send + Sync {}
|
||||||
struct Listener {
|
impl<T: Send + Sync> ListenerCreatorTrait for T where T: Fn() -> Box<dyn TunnelListener> + Send {}
|
||||||
inner: Arc<Mutex<dyn TunnelListener>>,
|
pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ListenerFactory {
|
||||||
|
creator_fn: Arc<ListenerCreator>,
|
||||||
must_succ: bool,
|
must_succ: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ListenerManager<H> {
|
pub struct ListenerManager<H> {
|
||||||
global_ctx: ArcGlobalCtx,
|
global_ctx: ArcGlobalCtx,
|
||||||
net_ns: NetNS,
|
net_ns: NetNS,
|
||||||
listeners: Vec<Listener>,
|
listeners: Vec<ListenerFactory>,
|
||||||
peer_manager: Arc<H>,
|
peer_manager: Arc<H>,
|
||||||
|
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
@@ -90,31 +93,39 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
|
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
|
||||||
|
let self_id = self.global_ctx.get_id();
|
||||||
self.add_listener(
|
self.add_listener(
|
||||||
RingTunnelListener::new(
|
move || {
|
||||||
format!("ring://{}", self.global_ctx.get_id())
|
Box::new(RingTunnelListener::new(
|
||||||
.parse()
|
format!("ring://{}", self_id).parse().unwrap(),
|
||||||
.unwrap(),
|
))
|
||||||
),
|
},
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for l in self.global_ctx.config.get_listener_uris().iter() {
|
for l in self.global_ctx.config.get_listener_uris().iter() {
|
||||||
let Ok(lis) = get_listener_by_url(l, self.global_ctx.clone()) else {
|
let l = l.clone();
|
||||||
|
let Ok(_) = get_listener_by_url(&l, self.global_ctx.clone()) else {
|
||||||
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
|
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
|
||||||
self.global_ctx
|
self.global_ctx
|
||||||
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
|
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
self.add_listener(lis, true).await?;
|
let ctx = self.global_ctx.clone();
|
||||||
|
self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.global_ctx.config.get_flags().enable_ipv6 {
|
if self.global_ctx.config.get_flags().enable_ipv6 {
|
||||||
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
|
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
|
||||||
let _ = self
|
let _ = self
|
||||||
.add_listener(
|
.add_listener(
|
||||||
UdpTunnelListener::new(ipv6_listener.parse().unwrap()),
|
move || {
|
||||||
|
Box::new(UdpTunnelListener::new(
|
||||||
|
ipv6_listener.clone().parse().unwrap(),
|
||||||
|
))
|
||||||
|
},
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -123,85 +134,91 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_listener<L>(&mut self, listener: L, must_succ: bool) -> Result<(), Error>
|
pub async fn add_listener<C: ListenerCreatorTrait + 'static>(
|
||||||
where
|
&mut self,
|
||||||
L: TunnelListener + 'static,
|
creator: C,
|
||||||
{
|
must_succ: bool,
|
||||||
let listener = Arc::new(Mutex::new(listener));
|
) -> Result<(), Error> {
|
||||||
self.listeners.push(Listener {
|
self.listeners.push(ListenerFactory {
|
||||||
inner: listener,
|
creator_fn: Arc::new(Box::new(creator)),
|
||||||
must_succ,
|
must_succ,
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument(skip(creator))]
|
||||||
async fn run_listener(
|
async fn run_listener(
|
||||||
listener: Arc<Mutex<dyn TunnelListener>>,
|
creator: Arc<ListenerCreator>,
|
||||||
peer_manager: Arc<H>,
|
peer_manager: Arc<H>,
|
||||||
global_ctx: ArcGlobalCtx,
|
global_ctx: ArcGlobalCtx,
|
||||||
) {
|
) {
|
||||||
let mut l = listener.lock().await;
|
|
||||||
global_ctx.add_running_listener(l.local_url());
|
|
||||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
|
|
||||||
loop {
|
loop {
|
||||||
let ret = match l.accept().await {
|
let mut l = (creator)();
|
||||||
Ok(ret) => ret,
|
let _g = global_ctx.net_ns.guard();
|
||||||
|
match l.listen().await {
|
||||||
|
Ok(_) => {
|
||||||
|
global_ctx.add_running_listener(l.local_url());
|
||||||
|
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
|
global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
|
||||||
l.local_url(),
|
l.local_url(),
|
||||||
e.to_string(),
|
format!("error: {:?}, retry listen later...", e),
|
||||||
));
|
));
|
||||||
tracing::error!(?e, ?l, "listener accept error");
|
tracing::error!(?e, ?l, "listener listen error");
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
loop {
|
||||||
|
let ret = match l.accept().await {
|
||||||
|
Ok(ret) => ret,
|
||||||
|
Err(e) => {
|
||||||
|
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
|
||||||
|
l.local_url(),
|
||||||
|
format!("error: {:?}, retry listen later...", e),
|
||||||
|
));
|
||||||
|
tracing::error!(?e, ?l, "listener accept error");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let tunnel_info = ret.info().unwrap();
|
let tunnel_info = ret.info().unwrap();
|
||||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
|
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
|
||||||
tunnel_info
|
tunnel_info
|
||||||
.local_addr
|
.local_addr
|
||||||
.clone()
|
.clone()
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string(),
|
.to_string(),
|
||||||
tunnel_info
|
tunnel_info
|
||||||
.remote_addr
|
.remote_addr
|
||||||
.clone()
|
.clone()
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string(),
|
.to_string(),
|
||||||
));
|
));
|
||||||
tracing::info!(ret = ?ret, "conn accepted");
|
tracing::info!(ret = ?ret, "conn accepted");
|
||||||
let peer_manager = peer_manager.clone();
|
let peer_manager = peer_manager.clone();
|
||||||
let global_ctx = global_ctx.clone();
|
let global_ctx = global_ctx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let server_ret = peer_manager.handle_tunnel(ret).await;
|
let server_ret = peer_manager.handle_tunnel(ret).await;
|
||||||
if let Err(e) = &server_ret {
|
if let Err(e) = &server_ret {
|
||||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
|
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
|
||||||
tunnel_info.local_addr.unwrap_or_default().to_string(),
|
tunnel_info.local_addr.unwrap_or_default().to_string(),
|
||||||
tunnel_info.remote_addr.unwrap_or_default().to_string(),
|
tunnel_info.remote_addr.unwrap_or_default().to_string(),
|
||||||
e.to_string(),
|
e.to_string(),
|
||||||
));
|
));
|
||||||
tracing::error!(error = ?e, "handle conn error");
|
tracing::error!(error = ?e, "handle conn error");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) -> Result<(), Error> {
|
pub async fn run(&mut self) -> Result<(), Error> {
|
||||||
for listener in &self.listeners {
|
for listener in &self.listeners {
|
||||||
let _guard = self.net_ns.guard();
|
|
||||||
let addr = listener.inner.lock().await.local_url();
|
|
||||||
tracing::warn!("run listener: {:?}", listener);
|
|
||||||
listener
|
|
||||||
.inner
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.listen()
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("failed to add listener {}", addr))?;
|
|
||||||
self.tasks.spawn(Self::run_listener(
|
self.tasks.spawn(Self::run_listener(
|
||||||
listener.inner.clone(),
|
listener.creator_fn.clone(),
|
||||||
self.peer_manager.clone(),
|
self.peer_manager.clone(),
|
||||||
self.global_ctx.clone(),
|
self.global_ctx.clone(),
|
||||||
));
|
));
|
||||||
@@ -213,12 +230,14 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::sync::atomic::{AtomicI32, Ordering};
|
||||||
|
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::global_ctx::tests::get_mock_global_ctx,
|
common::global_ctx::tests::get_mock_global_ctx,
|
||||||
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector},
|
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector, TunnelError},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -245,12 +264,18 @@ mod tests {
|
|||||||
|
|
||||||
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
|
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
|
||||||
|
|
||||||
|
let ring_id_clone = ring_id.clone();
|
||||||
listener_mgr
|
listener_mgr
|
||||||
.add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true)
|
.add_listener(
|
||||||
|
move || Box::new(RingTunnelListener::new(ring_id_clone.parse().unwrap())),
|
||||||
|
true,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
listener_mgr.run().await.unwrap();
|
listener_mgr.run().await.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
|
||||||
let connect_once = |ring_id| async move {
|
let connect_once = |ring_id| async move {
|
||||||
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
|
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
|
||||||
let (mut recv, _send) = tunnel.split();
|
let (mut recv, _send) = tunnel.split();
|
||||||
@@ -269,4 +294,60 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn retry_listen() {
|
||||||
|
let counter = Arc::new(AtomicI32::new(0));
|
||||||
|
let drop_counter = Arc::new(AtomicI32::new(0));
|
||||||
|
struct MockListener {
|
||||||
|
counter: Arc<AtomicI32>,
|
||||||
|
drop_counter: Arc<AtomicI32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl TunnelListener for MockListener {
|
||||||
|
fn local_url(&self) -> url::Url {
|
||||||
|
"mock://".parse().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen(&mut self) -> Result<(), TunnelError> {
|
||||||
|
self.counter.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
Err(TunnelError::BufferFull)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for MockListener {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.drop_counter.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let handler = Arc::new(MockListenerHandler {});
|
||||||
|
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
|
||||||
|
let counter_clone = counter.clone();
|
||||||
|
let drop_counter_clone = drop_counter.clone();
|
||||||
|
listener_mgr
|
||||||
|
.add_listener(
|
||||||
|
move || {
|
||||||
|
Box::new(MockListener {
|
||||||
|
counter: counter_clone.clone(),
|
||||||
|
drop_counter: drop_counter_clone.clone(),
|
||||||
|
})
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
listener_mgr.run().await.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
assert!(counter.load(Ordering::Relaxed) >= 2);
|
||||||
|
assert!(drop_counter.load(Ordering::Relaxed) >= 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -484,13 +484,6 @@ mod tests {
|
|||||||
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||||
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
||||||
|
|
||||||
let s = if drop_both {
|
|
||||||
let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
|
||||||
Box::new(TunnelWithFilter::new(s, s_recorder.clone()))
|
|
||||||
} else {
|
|
||||||
s
|
|
||||||
};
|
|
||||||
|
|
||||||
let c_peer_id = new_peer_id();
|
let c_peer_id = new_peer_id();
|
||||||
let s_peer_id = new_peer_id();
|
let s_peer_id = new_peer_id();
|
||||||
|
|
||||||
@@ -506,6 +499,7 @@ mod tests {
|
|||||||
s_peer
|
s_peer
|
||||||
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
|
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
|
||||||
.await;
|
.await;
|
||||||
|
// do not start ping for s, s only reponde to ping from c
|
||||||
|
|
||||||
assert!(c_ret.is_ok());
|
assert!(c_ret.is_ok());
|
||||||
assert!(s_ret.is_ok());
|
assert!(s_ret.is_ok());
|
||||||
@@ -547,7 +541,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn peer_conn_pingpong_bothside_timeout() {
|
async fn peer_conn_pingpong_bothside_timeout() {
|
||||||
peer_conn_pingpong_test_common(4, 12, true, true).await;
|
peer_conn_pingpong_test_common(3, 14, true, true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -289,29 +289,29 @@ impl PeerConnPinger {
|
|||||||
"pingpong task recv pingpong_once result"
|
"pingpong task recv pingpong_once result"
|
||||||
);
|
);
|
||||||
|
|
||||||
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
|
let current_rx_packets = throughput.rx_packets();
|
||||||
let current_rx_packets = throughput.rx_packets();
|
if last_rx_packets != current_rx_packets {
|
||||||
let need_close = if last_rx_packets != current_rx_packets {
|
// if we receive some packet from peers, reset the counter to avoid conn close.
|
||||||
// if we receive some packet from peers, we should relax the condition
|
// conn will close only if we have 5 continous round pingpong loss after no packet received.
|
||||||
counter > 50 && loss_rate_1 > 0.5
|
counter = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: wait more time to see if the loss rate is still high after no rx
|
tracing::debug!(
|
||||||
} else {
|
"counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
|
||||||
true
|
counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id
|
||||||
};
|
);
|
||||||
|
|
||||||
if need_close {
|
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
?ret,
|
?ret,
|
||||||
?self,
|
?self,
|
||||||
?loss_rate_1,
|
?loss_rate_1,
|
||||||
?loss_rate_20,
|
?loss_rate_20,
|
||||||
?last_rx_packets,
|
?last_rx_packets,
|
||||||
?current_rx_packets,
|
?current_rx_packets,
|
||||||
"pingpong loss rate too high, closing"
|
"pingpong loss rate too high, closing"
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
last_rx_packets = throughput.rx_packets();
|
last_rx_packets = throughput.rx_packets();
|
||||||
|
|||||||
@@ -1739,7 +1739,6 @@ impl RouteSessionManager {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let _ = self.stop_session(*peer_id);
|
let _ = self.stop_session(*peer_id);
|
||||||
assert_ne!(Some(*peer_id), cur_dst_peer_id_to_initiate);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,8 +24,10 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
|
|||||||
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
|
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
|
||||||
ret.listeners = self
|
ret.listeners = self
|
||||||
.global_ctx
|
.global_ctx
|
||||||
.get_running_listeners()
|
.config
|
||||||
|
.get_mapped_listeners()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
.chain(self.global_ctx.get_running_listeners().into_iter())
|
||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
.collect();
|
.collect();
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ message FlagsInConfig {
|
|||||||
string ipv6_listener = 14;
|
string ipv6_listener = 14;
|
||||||
bool multi_thread = 15;
|
bool multi_thread = 15;
|
||||||
CompressionAlgoPb data_compress_algo = 16;
|
CompressionAlgoPb data_compress_algo = 16;
|
||||||
|
bool bind_device = 17;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RpcDescriptor {
|
message RpcDescriptor {
|
||||||
|
|||||||
@@ -159,7 +159,10 @@ pub fn build_rpc_packet(
|
|||||||
let cur_packet = RpcPacket {
|
let cur_packet = RpcPacket {
|
||||||
from_peer,
|
from_peer,
|
||||||
to_peer,
|
to_peer,
|
||||||
descriptor: if cur_offset == 0 {
|
descriptor: if cur_offset == 0
|
||||||
|
|| compression_info.algo == CompressionAlgoPb::None as i32
|
||||||
|
{
|
||||||
|
// old version must have descriptor on every piece
|
||||||
Some(rpc_desc.clone())
|
Some(rpc_desc.clone())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
|||||||
+40
-20
@@ -28,6 +28,30 @@ impl TcpTunnelListener {
|
|||||||
listener: None,
|
listener: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn do_accept(&mut self) -> Result<Box<dyn Tunnel>, std::io::Error> {
|
||||||
|
let listener = self.listener.as_ref().unwrap();
|
||||||
|
let (stream, _) = listener.accept().await?;
|
||||||
|
|
||||||
|
if let Err(e) = stream.set_nodelay(true) {
|
||||||
|
tracing::warn!(?e, "set_nodelay fail in accept");
|
||||||
|
}
|
||||||
|
|
||||||
|
let info = TunnelInfo {
|
||||||
|
tunnel_type: "tcp".to_owned(),
|
||||||
|
local_addr: Some(self.local_url().into()),
|
||||||
|
remote_addr: Some(
|
||||||
|
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (r, w) = stream.into_split();
|
||||||
|
Ok(Box::new(TunnelWrapper::new(
|
||||||
|
FramedReader::new(r, TCP_MTU_BYTES),
|
||||||
|
FramedWriter::new(w),
|
||||||
|
Some(info),
|
||||||
|
)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -57,27 +81,23 @@ impl TunnelListener for TcpTunnelListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
|
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
|
||||||
let listener = self.listener.as_ref().unwrap();
|
loop {
|
||||||
let (stream, _) = listener.accept().await?;
|
match self.do_accept().await {
|
||||||
|
Ok(ret) => return Ok(ret),
|
||||||
if let Err(e) = stream.set_nodelay(true) {
|
Err(e) => {
|
||||||
tracing::warn!(?e, "set_nodelay fail in accept");
|
use std::io::ErrorKind::*;
|
||||||
|
if matches!(
|
||||||
|
e.kind(),
|
||||||
|
NotConnected | ConnectionAborted | ConnectionRefused | ConnectionReset
|
||||||
|
) {
|
||||||
|
tracing::warn!(?e, "accept fail with retryable error: {:?}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tracing::warn!(?e, "accept fail");
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let info = TunnelInfo {
|
|
||||||
tunnel_type: "tcp".to_owned(),
|
|
||||||
local_addr: Some(self.local_url().into()),
|
|
||||||
remote_addr: Some(
|
|
||||||
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
|
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (r, w) = stream.into_split();
|
|
||||||
Ok(Box::new(TunnelWrapper::new(
|
|
||||||
FramedReader::new(r, TCP_MTU_BYTES),
|
|
||||||
FramedWriter::new(w),
|
|
||||||
Some(info),
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn local_url(&self) -> url::Url {
|
fn local_url(&self) -> url::Url {
|
||||||
|
|||||||
Reference in New Issue
Block a user