Add instance recv limiter in peer conn (#2027)

This commit is contained in:
KKRainbow
2026-03-29 10:28:02 +08:00
committed by GitHub
parent d4c1b0e867
commit bcd75d6ce3
14 changed files with 156 additions and 10 deletions
+7
View File
@@ -56,6 +56,13 @@ jobs:
- uses: taiki-e/install-action@cargo-hack - uses: taiki-e/install-action@cargo-hack
- name: Check Cargo.lock is up to date
run: |
if ! cargo metadata --format-version 1 --locked --no-deps > /dev/null; then
echo "::error::Cargo.lock is out of date. Run cargo generate-lockfile or cargo build locally, then commit Cargo.lock."
exit 1
fi
- name: Check formatting - name: Check formatting
run: cargo fmt --all -- --check run: cargo fmt --all -- --check
Generated
+44 -7
View File
@@ -2191,6 +2191,7 @@ dependencies = [
"easytier-rpc-build", "easytier-rpc-build",
"encoding", "encoding",
"flume 0.12.0", "flume 0.12.0",
"forwarded-header-value",
"futures", "futures",
"futures-util", "futures-util",
"gethostname 0.5.0", "gethostname 0.5.0",
@@ -2208,6 +2209,7 @@ dependencies = [
"humantime-serde", "humantime-serde",
"idna 1.0.3", "idna 1.0.3",
"indoc", "indoc",
"itertools 0.14.0",
"kcp-sys", "kcp-sys",
"machine-uid", "machine-uid",
"maplit", "maplit",
@@ -2920,6 +2922,16 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "forwarded-header-value"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9"
dependencies = [
"nonempty",
"thiserror 1.0.63",
]
[[package]] [[package]]
name = "fragile" name = "fragile"
version = "2.0.1" version = "2.0.1"
@@ -3770,7 +3782,7 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
"unicase", "unicase",
"webpki", "webpki",
"webpki-roots", "webpki-roots 0.26.3",
"zeroize", "zeroize",
] ]
@@ -3847,7 +3859,7 @@ dependencies = [
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tower-service", "tower-service",
"webpki-roots", "webpki-roots 0.26.3",
] ]
[[package]] [[package]]
@@ -4319,6 +4331,15 @@ dependencies = [
"either", "either",
] ]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.11" version = "1.0.11"
@@ -5205,6 +5226,12 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nonempty"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
[[package]] [[package]]
name = "normpath" name = "normpath"
version = "1.3.0" version = "1.3.0"
@@ -7137,7 +7164,7 @@ dependencies = [
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-streams", "wasm-streams",
"web-sys", "web-sys",
"webpki-roots", "webpki-roots 0.26.3",
"windows-registry", "windows-registry",
] ]
@@ -8460,7 +8487,7 @@ dependencies = [
"tracing", "tracing",
"url", "url",
"uuid", "uuid",
"webpki-roots", "webpki-roots 0.26.3",
] ]
[[package]] [[package]]
@@ -9558,9 +9585,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-websockets" name = "tokio-websockets"
version = "0.8.3" version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842e11addde61da7c37ef205cd625ebcd7b607076ea62e4698f06bfd5fd01a03" checksum = "dad543404f98bfc969aeb71994105c592acfc6c43323fddcd016bb208d1c65cb"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
@@ -9571,10 +9598,11 @@ dependencies = [
"httparse", "httparse",
"ring", "ring",
"rustls-pki-types", "rustls-pki-types",
"simdutf8",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-util", "tokio-util",
"webpki-roots", "webpki-roots 1.0.6",
] ]
[[package]] [[package]]
@@ -10675,6 +10703,15 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
] ]
[[package]]
name = "webpki-roots"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed"
dependencies = [
"rustls-pki-types",
]
[[package]] [[package]]
name = "webview2-com" name = "webview2-com"
version = "0.38.0" version = "0.38.0"
@@ -305,6 +305,19 @@ watch(() => curNetwork.value, syncNormalizedNetwork, { immediate: true, deep: fa
</div> </div>
</div> </div>
<div class="flex flex-row gap-x-9 flex-wrap">
<div class="flex flex-col gap-2 basis-5/12 grow">
<div class="flex">
<label for="instance_recv_bps_limit">{{ t('instance_recv_bps_limit') }}</label>
<span class="pi pi-question-circle ml-2 self-center"
v-tooltip="t('instance_recv_bps_limit_help')"></span>
</div>
<InputNumber id="instance_recv_bps_limit" v-model="curNetwork.instance_recv_bps_limit"
aria-describedby="instance_recv_bps_limit-help" :format="false"
:placeholder="t('instance_recv_bps_limit_placeholder')" :min="1" fluid />
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap"> <div class="flex flex-row gap-x-9 flex-wrap">
<div class="flex flex-col gap-2 basis-5/12 grow"> <div class="flex flex-col gap-2 basis-5/12 grow">
<div class="flex"> <div class="flex">
@@ -196,6 +196,12 @@ mtu_help: |
TUN设备的MTU,默认为非加密时为1380,加密时为1360。范围:400-1380 TUN设备的MTU,默认为非加密时为1380,加密时为1360。范围:400-1380
mtu_placeholder: 留空为默认值1380 mtu_placeholder: 留空为默认值1380
instance_recv_bps_limit: 实例接收限速
instance_recv_bps_limit_help: |
限制当前实例整体入站流量的总接收速率,单位为字节每秒。
留空表示不限速。
instance_recv_bps_limit_placeholder: 留空表示不限速
mapped_listeners: 监听映射 mapped_listeners: 监听映射
mapped_listeners_help: | mapped_listeners_help: |
手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。 手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。
@@ -196,6 +196,12 @@ mtu_help: |
MTU of the TUN device, default is 1380 for non-encryption, 1360 for encryption. Range:400-1380 MTU of the TUN device, default is 1380 for non-encryption, 1360 for encryption. Range:400-1380
mtu_placeholder: Leave blank as default value 1380 mtu_placeholder: Leave blank as default value 1380
instance_recv_bps_limit: Instance Receive Limit
instance_recv_bps_limit_help: |
Limit the total receive bandwidth for the whole instance. Unit: bytes per second.
Leave blank for no limit.
instance_recv_bps_limit_placeholder: Leave blank for no limit
mapped_listeners: Map Listeners mapped_listeners: Map Listeners
mapped_listeners_help: | mapped_listeners_help: |
Manually specify the public address of the listener, other nodes can use this address to connect to this node. Manually specify the public address of the listener, other nodes can use this address to connect to this node.
@@ -78,6 +78,7 @@ export interface NetworkConfig {
socks5_port: number socks5_port: number
mtu: number | null mtu: number | null
instance_recv_bps_limit: number | null
mapped_listeners: string[] mapped_listeners: string[]
enable_magic_dns?: boolean enable_magic_dns?: boolean
@@ -146,6 +147,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
enable_socks5: false, enable_socks5: false,
socks5_port: 1080, socks5_port: 1080,
mtu: null, mtu: null,
instance_recv_bps_limit: null,
mapped_listeners: [], mapped_listeners: [],
enable_magic_dns: false, enable_magic_dns: false,
enable_private_mode: false, enable_private_mode: false,
+3
View File
@@ -217,6 +217,9 @@ core_clap:
foreign_relay_bps_limit: foreign_relay_bps_limit:
en: "the maximum bps limit for foreign network relay, default is no limit. unit: BPS (bytes per second)" en: "the maximum bps limit for foreign network relay, default is no limit. unit: BPS (bytes per second)"
zh-CN: "作为共享节点时,限制非本地网络的流量转发速率,默认无限制,单位 BPS (字节每秒)" zh-CN: "作为共享节点时,限制非本地网络的流量转发速率,默认无限制,单位 BPS (字节每秒)"
instance_recv_bps_limit:
en: "the maximum total receive bps limit for this instance, default is no limit. unit: BPS (bytes per second)"
zh-CN: "限制当前网络实例整体入站流量的总接收速率,默认无限制,单位 BPS (字节每秒)"
tcp_whitelist: tcp_whitelist:
en: "tcp port whitelist. Supports single ports (80) and ranges (8000-9000)" en: "tcp port whitelist. Supports single ports (80) and ranges (8000-9000)"
zh-CN: "TCP 端口白名单。支持单个端口(80)和范围(8000-9000" zh-CN: "TCP 端口白名单。支持单个端口(80)和范围(8000-9000"
+1
View File
@@ -69,6 +69,7 @@ pub fn gen_default_flags() -> Flags {
quic_listen_port: u32::MAX, quic_listen_port: u32::MAX,
need_p2p: false, need_p2p: false,
instance_recv_bps_limit: u64::MAX,
} }
} }
+10
View File
@@ -560,6 +560,13 @@ struct NetworkOptions {
)] )]
foreign_relay_bps_limit: Option<u64>, foreign_relay_bps_limit: Option<u64>,
#[arg(
long,
env = "ET_INSTANCE_RECV_BPS_LIMIT",
help = t!("core_clap.instance_recv_bps_limit").to_string(),
)]
instance_recv_bps_limit: Option<u64>,
#[arg( #[arg(
long, long,
value_delimiter = ',', value_delimiter = ',',
@@ -1060,6 +1067,9 @@ impl NetworkOptions {
f.foreign_relay_bps_limit = self f.foreign_relay_bps_limit = self
.foreign_relay_bps_limit .foreign_relay_bps_limit
.unwrap_or(f.foreign_relay_bps_limit); .unwrap_or(f.foreign_relay_bps_limit);
f.instance_recv_bps_limit = self
.instance_recv_bps_limit
.unwrap_or(f.instance_recv_bps_limit);
f.multi_thread_count = self.multi_thread_count.unwrap_or(f.multi_thread_count); f.multi_thread_count = self.multi_thread_count.unwrap_or(f.multi_thread_count);
f.disable_relay_kcp = self.disable_relay_kcp.unwrap_or(f.disable_relay_kcp); f.disable_relay_kcp = self.disable_relay_kcp.unwrap_or(f.disable_relay_kcp);
f.disable_relay_quic = self.disable_relay_quic.unwrap_or(f.disable_relay_quic); f.disable_relay_quic = self.disable_relay_quic.unwrap_or(f.disable_relay_quic);
+6
View File
@@ -826,6 +826,10 @@ impl NetworkConfig {
flags.mtu = mtu as u32; flags.mtu = mtu as u32;
} }
if let Some(instance_recv_bps_limit) = self.instance_recv_bps_limit {
flags.instance_recv_bps_limit = instance_recv_bps_limit;
}
if let Some(enable_private_mode) = self.enable_private_mode { if let Some(enable_private_mode) = self.enable_private_mode {
flags.private_mode = enable_private_mode; flags.private_mode = enable_private_mode;
} }
@@ -978,6 +982,8 @@ impl NetworkConfig {
result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching); result.disable_sym_hole_punching = Some(flags.disable_sym_hole_punching);
result.enable_magic_dns = Some(flags.accept_dns); result.enable_magic_dns = Some(flags.accept_dns);
result.mtu = Some(flags.mtu as i32); result.mtu = Some(flags.mtu as i32);
result.instance_recv_bps_limit =
(flags.instance_recv_bps_limit != u64::MAX).then_some(flags.instance_recv_bps_limit);
result.enable_private_mode = Some(flags.private_mode); result.enable_private_mode = Some(flags.private_mode);
if flags.relay_network_whitelist == "*" { if flags.relay_network_whitelist == "*" {
+11
View File
@@ -1365,6 +1365,17 @@ impl PeerConn {
&format!("{}:recv", conn_info_for_instrument.network_name), &format!("{}:recv", conn_info_for_instrument.network_name),
limiter_config.into(), limiter_config.into(),
)) ))
} else if self.global_ctx.get_flags().instance_recv_bps_limit != u64::MAX {
let limiter_config = LimiterConfig {
burst_rate: None,
bps: Some(self.global_ctx.get_flags().instance_recv_bps_limit),
fill_duration_ms: None,
};
Some(
self.global_ctx
.token_bucket_manager()
.get_or_create("instance:recv", limiter_config.into()),
)
} else { } else {
None None
}; };
+1
View File
@@ -87,6 +87,7 @@ message NetworkConfig {
optional string credential_file = 57; optional string credential_file = 57;
optional bool lazy_p2p = 58; optional bool lazy_p2p = 58;
optional bool need_p2p = 59; optional bool need_p2p = 59;
optional uint64 instance_recv_bps_limit = 60;
} }
message PortForwardConfig { message PortForwardConfig {
+1
View File
@@ -73,6 +73,7 @@ message FlagsInConfig {
bool lazy_p2p = 37; bool lazy_p2p = 37;
bool need_p2p = 38; bool need_p2p = 38;
uint64 instance_recv_bps_limit = 39;
} }
message RpcDescriptor { message RpcDescriptor {
+42
View File
@@ -1535,6 +1535,48 @@ pub async fn relay_bps_limit_test(#[values(100, 200, 400, 800)] bps_limit: u64)
drop_insts(insts).await; drop_insts(insts).await;
} }
#[rstest::rstest]
#[serial_test::serial]
#[tokio::test]
pub async fn instance_recv_bps_limit_test(#[values(100, 800)] bps_limit: u64) {
let insts = init_three_node_ex(
"tcp",
|cfg| {
if cfg.get_inst_name() == "inst2" {
let mut f = cfg.get_flags();
f.instance_recv_bps_limit = bps_limit * 1024;
cfg.set_flags(f);
}
cfg
},
false,
)
.await;
let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:22223".parse().unwrap());
let tcp_connector = TcpTunnelConnector::new("tcp://10.144.144.3:22223".parse().unwrap());
let bps = _tunnel_bench_netns(
tcp_listener,
tcp_connector,
NetNS::new(Some("net_c".into())),
NetNS::new(Some("net_a".into())),
)
.await;
println!("bps: {}", bps);
let bps = bps as u64 / 1024;
assert!(
bps >= bps_limit - 50 && bps <= bps_limit + 50,
"bps: {}, bps_limit: {}",
bps,
bps_limit
);
drop_insts(insts).await;
}
async fn assert_try_direct_connect_err<C>(inst: &Instance, connector: C) async fn assert_try_direct_connect_err<C>(inst: &Instance, connector: C)
where where
C: crate::tunnel::TunnelConnector + std::fmt::Debug, C: crate::tunnel::TunnelConnector + std::fmt::Debug,