diff --git a/Cargo.lock b/Cargo.lock index 0f5dd647..06b919f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2273,6 +2273,7 @@ dependencies = [ "gethostname 0.5.0", "git-version", "globwalk", + "guarden", "hickory-client", "hickory-proto", "hickory-resolver", @@ -2456,6 +2457,7 @@ dependencies = [ "dashmap", "easytier", "futures", + "guarden", "jsonwebtoken", "mimalloc", "mockall", @@ -3590,6 +3592,28 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "guarden" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca87812d87fa82896df1adfb5c111cdeaae3edb6da028f5df002dcbd7df71454" +dependencies = [ + "futures", + "guarden-macros", + "tokio", +] + +[[package]] +name = "guarden-macros" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b42f4b8de91cbd793ce8e6cf8d4821ef02d2d5b4468e0a55a36c65c5581de53" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "h2" version = "0.4.7" @@ -3705,12 +3729,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hermit-abi" version = "0.5.2" @@ -4026,7 +4044,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -4695,9 +4713,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.172" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libdbus-sys" @@ -5043,14 +5061,13 @@ dependencies = [ [[package]] name = "mio" -version = "1.0.2" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ - "hermit-abi 0.3.9", "libc", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6551,7 +6568,7 @@ checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi 0.5.2", + "hermit-abi", "pin-project-lite", "rustix 1.0.7", "windows-sys 0.61.2", @@ -8650,12 +8667,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -9774,9 +9791,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.48.0" +version = "1.52.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" dependencies = [ "bytes", "libc", @@ -9784,7 +9801,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.1", + "socket2 0.6.3", "tokio-macros", "tracing", "windows-sys 0.61.2", @@ -9792,9 +9809,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", diff --git a/easytier-contrib/easytier-uptime/Cargo.toml b/easytier-contrib/easytier-uptime/Cargo.toml index 9643e92f..2f67757d 100644 --- a/easytier-contrib/easytier-uptime/Cargo.toml +++ b/easytier-contrib/easytier-uptime/Cargo.toml @@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1.0", features = ["v4", "serde"] } +guarden = "0.1" # Axum web framework axum = { version = "0.8.4", features = ["macros"] } diff --git a/easytier-contrib/easytier-uptime/src/health_checker.rs b/easytier-contrib/easytier-uptime/src/health_checker.rs index 9346e7ef..13c0f91e 100644 --- a/easytier-contrib/easytier-uptime/src/health_checker.rs +++ b/easytier-contrib/easytier-uptime/src/health_checker.rs @@ -10,9 +10,9 @@ use easytier::{ common::config::{ ConfigFileControl, ConfigLoader, NetworkIdentity, PeerConfig, TomlConfigLoader, }, - defer, instance_manager::NetworkInstanceManager, }; +use guarden::defer; use serde::{Deserialize, Serialize}; use sqlx::any; use tokio_util::task::AbortOnDropHandle; diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index e136e43d..cd6d876c 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -50,6 +50,8 @@ time = "0.3" toml = "0.8.12" chrono = { version = "0.4.37", features = ["serde"] } +guarden = "0.1" + delegate = "0.13.5" itertools = "0.14.0" diff --git a/easytier/src/common/dns.rs b/easytier/src/common/dns.rs index dd7eaef1..20922526 100644 --- a/easytier/src/common/dns.rs +++ b/easytier/src/common/dns.rs @@ -121,9 +121,8 @@ pub async fn socket_addrs( #[cfg(test)] mod tests { - use crate::defer; - use super::*; + use guarden::defer; #[tokio::test] async fn test_socket_addrs() { diff --git a/easytier/src/connector/udp_hole_punch/common.rs b/easytier/src/connector/udp_hole_punch/common.rs index 58a18f37..2b0d40be 100644 --- a/easytier/src/connector/udp_hole_punch/common.rs +++ b/easytier/src/connector/udp_hole_punch/common.rs @@ -6,6 +6,7 @@ use std::{ use crossbeam::atomic::AtomicCell; use dashmap::{DashMap, DashSet}; +use guarden::defer; use rand::seq::SliceRandom as _; use tokio::{net::UdpSocket, sync::Mutex, task::JoinSet}; use tracing::{Instrument, Level, instrument}; @@ -15,7 +16,6 @@ use crate::{ common::{ PeerId, error::Error, global_ctx::ArcGlobalCtx, join_joinset_background, netns::NetNS, upnp, }, - defer, peers::peer_manager::PeerManager, proto::common::NatType, tunnel::{ diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index 213664ec..b527a508 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -9,6 +9,7 @@ use std::{ }; use anyhow::Context; +use guarden::defer; use rand::{Rng, seq::SliceRandom}; use tokio::{net::UdpSocket, sync::RwLock}; use tokio_util::task::AbortOnDropHandle; @@ -22,7 +23,6 @@ use crate::{ }, handle_rpc_result, }, - defer, peers::peer_manager::PeerManager, proto::{ peer_rpc::{ diff --git a/easytier/src/core.rs b/easytier/src/core.rs index d8b3c354..d29d7708 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -12,7 +12,6 @@ use crate::{ constants::EASYTIER_VERSION, log, }, - defer, instance_manager::NetworkInstanceManager, launcher::add_proxy_network_to_config, proto::common::{CompressionAlgoPb, SecureModeConfig}, @@ -23,6 +22,7 @@ use crate::{ use anyhow::Context; use cidr::IpCidr; use clap::{CommandFactory, Parser}; +use guarden::defer; use rust_i18n::t; use std::{ net::{IpAddr, SocketAddr}, diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 53dbbbfa..6025d307 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -7,6 +7,7 @@ use std::{ use anyhow::Context; use bytes::Bytes; use dashmap::DashMap; +use guarden::defer; use kcp_sys::{ endpoint::{ConnId, KcpEndpoint, KcpPacketReceiver}, ffi_safe::KcpConfig, @@ -359,7 +360,7 @@ impl KcpProxyDst { transport_type: TcpProxyEntryTransportType::Kcp.into(), }, ); - crate::defer! { + defer! { proxy_entries.remove(&conn_id); if proxy_entries.capacity() - proxy_entries.len() > 16 { proxy_entries.shrink_to_fit(); diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index eb468037..5d9d90ab 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -24,6 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; use derivative::Derivative; use derive_more::{Constructor, Deref, DerefMut, From, Into}; +use guarden::defer; use prost::Message; use quinn::udp::{EcnCodepoint, RecvMeta, Transmit}; use quinn::{ @@ -662,7 +663,7 @@ impl QuicStreamReceiver { transport_type: TcpProxyEntryTransportType::Quic.into(), }, ); - crate::defer! { + defer! { proxy_entries.remove(&handle); if proxy_entries.capacity() - proxy_entries.len() > 16 { proxy_entries.shrink_to_fit(); diff --git a/easytier/src/instance/dns_server/system_config/linux.rs b/easytier/src/instance/dns_server/system_config/linux.rs index 834ef85c..2b406195 100644 --- a/easytier/src/instance/dns_server/system_config/linux.rs +++ b/easytier/src/instance/dns_server/system_config/linux.rs @@ -1,6 +1,5 @@ // translated from tailscale #32ce1bdb48078ec4cedaeeb5b1b2ff9c0ef61a49 -use crate::defer; use anyhow::{Context, Result}; use dbus::blocking::stdintf::org_freedesktop_dbus::Properties as _; use std::fs; @@ -167,6 +166,7 @@ fn new_os_configurator(_interface_name: String) -> Result<()> { Ok(()) } +use guarden::defer; use std::io::{self, BufRead, Cursor}; /// 返回 `resolv.conf` 内容的拥有者("systemd-resolved"、"NetworkManager"、"resolvconf" 或空字符串) diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 9849edc7..cac6960f 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -12,6 +12,7 @@ use std::{ use base64::Engine as _; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use guarden::guard; use hmac::Mac; use prost::Message; @@ -40,7 +41,6 @@ use crate::{ error::Error, global_ctx::ArcGlobalCtx, }, - guard, peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn}, proto::{ api::instance::{PeerConnInfo, PeerConnStats}, diff --git a/easytier/src/proto/rpc_impl/bidirect.rs b/easytier/src/proto/rpc_impl/bidirect.rs index 9ceb436c..e93babcb 100644 --- a/easytier/src/proto/rpc_impl/bidirect.rs +++ b/easytier/src/proto/rpc_impl/bidirect.rs @@ -1,10 +1,10 @@ use std::sync::{Arc, Mutex, atomic::AtomicBool}; use futures::{SinkExt as _, StreamExt}; +use guarden::defer; use tokio::{task::JoinSet, time::timeout}; use crate::{ - defer, proto::rpc_types::error::Error, tunnel::{Tunnel, packet_def::PacketType, ring::create_ring_tunnel_pair}, }; diff --git a/easytier/src/proto/rpc_impl/client.rs b/easytier/src/proto/rpc_impl/client.rs index 9ae04c48..d7960e41 100644 --- a/easytier/src/proto/rpc_impl/client.rs +++ b/easytier/src/proto/rpc_impl/client.rs @@ -4,18 +4,17 @@ use std::sync::{Arc, Mutex}; use bytes::Bytes; use dashmap::DashMap; +use guarden::defer; use prost::Message; use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio::time::timeout; use tokio_stream::StreamExt; -use crate::common::shrink_dashmap; use crate::common::{ - PeerId, + PeerId, shrink_dashmap, stats_manager::{LabelSet, LabelType, MetricName, StatsManager}, }; -use crate::defer; use crate::proto::common::{ CompressionAlgoPb, RpcCompressionInfo, RpcDescriptor, RpcPacket, RpcRequest, RpcResponse, }; diff --git a/easytier/src/utils/guard.rs b/easytier/src/utils/guard.rs deleted file mode 100644 index eca86824..00000000 --- a/easytier/src/utils/guard.rs +++ /dev/null @@ -1,638 +0,0 @@ -//! # Guard Module Utilities -//! -//! This module provides mechanisms for scope-based resource management and deferred execution. -//! -//! ### ⚠️ Critical Usage Note: Diverging Expressions -//! -//! Do not use "naked" diverging expressions—such as `panic!`, `todo!`, or `loop {}`—as -//! the sole content of sync guard closure. This prevents the compiler from -//! distinguishing between synchronous (`ASYNC = false`) and asynchronous -//! (`ASYNC = true`) implementations, leading to a type inference error (E0277). -//! -//! ### Technical Context -//! -//! The `!` (Never Type) is a bottom type that can be coerced into any other type. -//! Because it satisfies both the `()` requirement for sync guards and the `Future` -//! requirement for async guards, the compiler encounters an inference deadlock. -//! -//! ### Workaround -//! -//! For macros like `guard!` or `guarded!`, force the closure to resolve to `()` -//! by explicitly setting the guard to `sync`: -//! -//! ```rust -//! let _g = guard!([val] sync { -//! panic!("critical failure"); -//! }); -//! ``` - -use crate::utils::task::{DetachableTask, TaskSpawner}; -use std::fmt::Debug; -use std::mem::ManuallyDrop; -use std::ops::{Deref, DerefMut}; - -pub trait CallableGuard { - type Output; - fn call(self, context: Context) -> Self::Output; -} - -impl CallableGuard for Guard -where - Guard: FnOnce(Context), -{ - type Output = (); - - fn call(self, context: Context) -> Self::Output { - self(context) - } -} - -impl CallableGuard for Guard -where - Guard: FnOnce(Context) -> Task + Send + 'static, - Task: Future + Send + 'static, - _R: Send + 'static, -{ - type Output = DetachableTask, Task>; - - fn call(self, context: Context) -> Self::Output { - DetachableTask::new(self(context)) - } -} - -pub struct ContextGuard> { - context: ManuallyDrop, - guard: ManuallyDrop, -} - -impl> Deref - for ContextGuard -{ - type Target = Context; - - fn deref(&self) -> &Self::Target { - &self.context - } -} - -impl> DerefMut - for ContextGuard -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.context - } -} - -impl> Debug - for ContextGuard -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let name = if ASYNC { - "ContextGuard::Async" - } else { - "ContextGuard::Sync" - }; - f.debug_struct(name) - .field("context", &self.context) - .finish_non_exhaustive() - } -} - -impl> - ContextGuard -{ - /// Creates a new `ContextGuard`. - /// - /// **Note on generics:** The seemingly unused `_R` generic parameter and the - /// `Guard: FnOnce(Context) -> _R` trait bound are intentionally included. - /// They act as a hint to help the compiler infer closure types. - pub fn new<_R>(context: Context, guard: Guard) -> Self - where - Guard: FnOnce(Context) -> _R, - { - ContextGuard { - context: ManuallyDrop::new(context), - guard: ManuallyDrop::new(guard), - } - } -} - -impl> - ContextGuard -{ - unsafe fn call(&mut self) -> Guard::Output { - unsafe { - let context = ManuallyDrop::take(&mut self.context); - let guard = ManuallyDrop::take(&mut self.guard); - - guard.call(context) - } - } - - pub fn trigger(self) -> Guard::Output { - let mut this = ManuallyDrop::new(self); - unsafe { this.call() } - } - - pub fn defuse(self) -> Context { - let mut this = ManuallyDrop::new(self); - unsafe { - ManuallyDrop::drop(&mut this.guard); - ManuallyDrop::take(&mut this.context) - } - } -} - -impl> Drop - for ContextGuard -{ - fn drop(&mut self) { - let _: Guard::Output = unsafe { self.call() }; - } -} - -// region macro - -#[doc(hidden)] -#[macro_export] -macro_rules! __guarded { - (@parse@action $guard:ident => $($tt:tt)*) => { - $crate::__guarded! { @parse@async action: [ @stmt $guard ] ; $($tt)* } - }; - - (@parse@action $($tt:tt)*) => { - $crate::__guarded! { @parse@async action: [ @stmt __guard ] ; $($tt)* } - }; - - (@parse@async action: [ $($action:tt)* ] ; sync $($tt:tt)*) => { - $crate::__guarded! { @parse@move action: [ $($action)* ] ; async: [ false ] ; $($tt)* } - }; - - (@parse@async action: [ $($action:tt)* ] ; $($tt:tt)*) => { - $crate::__guarded! { @parse@move action: [ $($action)* ] ; async: [ _ ] ; $($tt)* } - }; - - (@parse@move action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move $($tt:tt)*) => { - $crate::__guarded! { @parse action: [ $($action)* ] ; async: [ $async ] ; move: [ move ] ; $($tt)* } - }; - - (@parse@move action: [ $($action:tt)* ] ; async: [ $async:tt ] ; $($tt:tt)*) => { - $crate::__guarded! { @parse action: [ $($action)* ] ; async: [ $async ] ; move: [] ; $($tt)* } - }; - - ( - @parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ; - [ $($args:tt)* ] $body:block - ) => { - $crate::__guarded! { - action: [ $($action)* ] - async: [ $async ] - move: [ $($move)? ] - mut: [] - rest: [ $($args)* , ] - args: [] - vars: [] - body: [ $body ] - } - }; - - ( - @parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ; - $body:block - ) => { - $crate::__guarded! { - @parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ; - [] $body - } - }; - - ( - @parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ; - [ $($args:tt)* ] $($body:tt)* - ) => { - $crate::__guarded! { - @parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ; - [ $($args)* ] { $($body)* } - } - }; - - ( - @parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ; - $($body:tt)* - ) => { - $crate::__guarded! { - @parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ; - [] { $($body)* } - } - }; - - ( - action: [ $($action:tt)* ] - async: [ $async:tt ] - move: [ $($move:tt)? ] - mut: [ $($mut:tt)? ] - rest: [ mut $arg:ident , $($rest:tt)* ] - args: [ $($args:ident)* ] - vars: [ $($vars:tt)* ] - body: [ $body:expr ] - ) => { - $crate::__guarded! { - action: [ $($action)* ] - async: [ $async ] - move: [ $($move)? ] - mut: [ mut ] - rest: [ $($rest)* ] - args: [ $($args)* $arg ] - vars: [ $($vars)* [mut $arg] ] - body: [ $body ] - } - }; - - ( - action: [ $($action:tt)* ] - async: [ $async:tt ] - move: [ $($move:tt)? ] - mut: [ $($mut:tt)? ] - rest: [ $arg:ident , $($rest:tt)* ] - args: [ $($args:ident)* ] - vars: [ $($vars:tt)* ] - body: [ $body:expr ] - ) => { - $crate::__guarded! { - action: [ $($action)* ] - async: [ $async ] - move: [ $($move)? ] - mut: [ $($mut)? ] - rest: [ $($rest)* ] - args: [ $($args)* $arg ] - vars: [ $($vars)* [$arg] ] - body: [ $body ] - } - }; - - ( - action: [ @stmt $guard:ident ] - async: [ $async:tt ] - move: [ $($move:tt)? ] - mut: [ $($mut:tt)? ] - rest: [ $(,)* ] - args: [ $($args:ident)* ] - vars: [ $([$($vars:tt)*])* ] - body: [ $body:expr ] - ) => { - let $($mut)? $guard = $crate::utils::guard::ContextGuard::<$async, _, _>::new( - ( $($args),* ), - $($move)? |#[allow(unused_parens, unused_mut)] ( $($($vars)*),* )| $body - ); - - #[allow(unused_parens, unused_variables, clippy::toplevel_ref_arg)] - let ( $(ref $($vars)*),* ) = *$guard; - }; - - ( - action: [ @expr ] - async: [ $async:tt ] - move: [ $($move:tt)? ] - mut: [ $($mut:tt)? ] - rest: [ $(,)* ] - args: [ $($args:ident)* ] - vars: [ $([$($vars:tt)*])* ] - body: [ $body:expr ] - ) => { - $crate::utils::guard::ContextGuard::<$async, _, _>::new( - ( $($args),* ), - $($move)? |#[allow(unused_parens)] ( $($($vars)*),* )| $body - ) - }; -} - -/// Creates a [`ContextGuard`] object, binding it to a variable with the specified name (e.g., `_guard`). -/// Context variables specified in the macro invocation are available within and after the guard body. -/// -/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self) -/// regarding type inference deadlocks. -#[macro_export] -macro_rules! guarded { - ( $($tt:tt)* ) => { - $crate::__guarded! { @parse@action $($tt)* } - }; -} - -/// Creates a [`ContextGuard`] object, without binding it to a variable. -/// Context variables specified in the macro invocation are available within the guard body. -/// -/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self) -/// regarding type inference deadlocks. -#[macro_export] -macro_rules! guard { - ( $($tt:tt)* ) => { - $crate::__guarded! { @parse@async action: [ @expr ] ; $($tt)* } - }; -} - -// endregion - -/// Alias for [`guarded!`]. -/// -/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self) -/// regarding type inference deadlocks. -#[macro_export] -macro_rules! defer { - ( $($tt:tt)* ) => { - $crate::guarded! { $($tt)* } - }; -} - -#[cfg(test)] -mod tests { - use std::panic::catch_unwind; - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::time::Duration; - use tokio::sync::oneshot; - - #[test] - fn trigger_sync_executes_once() { - let called = Arc::new(AtomicUsize::new(0)); - let observed = Arc::new(AtomicUsize::new(0)); - - let value = 7usize; - let guard = { - let called = called.clone(); - let observed = observed.clone(); - crate::guard!(move [value] { - called.fetch_add(1, Ordering::SeqCst); - observed.store(value, Ordering::SeqCst); - }) - }; - - guard.trigger(); - - assert_eq!(called.load(Ordering::SeqCst), 1); - assert_eq!(observed.load(Ordering::SeqCst), 7); - } - - #[test] - fn defuse_sync_returns_context_without_running_guard() { - let called = Arc::new(AtomicUsize::new(0)); - - let value = String::from("hello"); - let guard = { - let called = called.clone(); - crate::guard!(move [mut value] { - value.push_str(" world"); - called.fetch_add(1, Ordering::SeqCst); - }) - }; - - let context = guard.defuse(); - assert_eq!(context, "hello"); - assert_eq!(called.load(Ordering::SeqCst), 0); - } - - #[test] - fn drop_sync_triggers_guard() { - let called = Arc::new(AtomicUsize::new(0)); - - { - let called = called.clone(); - crate::guarded!([called] { - called.fetch_add(1, Ordering::SeqCst); - }); - } - - assert_eq!(called.load(Ordering::SeqCst), 1); - } - - #[test] - fn drop_propagates_guard_panic() { - let dropped = catch_unwind(|| { - guarded! { - sync { - panic!("boom"); - } - } - }); - - assert!(dropped.is_err()); - } - - #[tokio::test] - async fn trigger_async_returns_runnable_task() { - let called = Arc::new(AtomicUsize::new(0)); - - let value = 5usize; - let guard = { - let called = called.clone(); - crate::guard!(move [value] async move { - called.fetch_add(value, Ordering::SeqCst); - }) - }; - let task = guard.trigger(); - task.await; - - assert_eq!(called.load(Ordering::SeqCst), 5); - } - - #[tokio::test] - async fn drop_async_detaches_task() { - let (tx, rx) = oneshot::channel(); - - { - let mut tx = Some(tx); - let value = 9usize; - let _guard = crate::guard!(move [value] { - let tx = tx.take(); - async move { - if let Some(tx) = tx { - let _ = tx.send(value); - } - } - }); - } - - let value = tokio::time::timeout(Duration::from_secs(1), rx) - .await - .expect("detached task should run") - .expect("detached task should send value"); - assert_eq!(value, 9); - } - - #[tokio::test] - async fn defuse_async_does_not_execute() { - let called = Arc::new(AtomicUsize::new(0)); - - let value = 11usize; - let guard = { - let called = called.clone(); - crate::guard!(move [value] async move { - called.fetch_add(value, Ordering::SeqCst); - }) - }; - - let context = guard.defuse(); - assert_eq!(context, 11); - - tokio::time::sleep(Duration::from_millis(20)).await; - assert_eq!(called.load(Ordering::SeqCst), 0); - } - - #[test] - fn guarded_named_mut_binding_updates_context_before_drop() { - let committed = Arc::new(AtomicUsize::new(0)); - - { - let value = 1usize; - let step = 2usize; - let committed = committed.clone(); - - crate::guarded!(scope_guard => [mut value, step] { - committed.store(value + step, Ordering::SeqCst); - }); - - *value += 10; - assert_eq!(*value, 11); - assert_eq!(*step, 2); - - drop(scope_guard); - } - - assert_eq!(committed.load(Ordering::SeqCst), 13); - } - - #[test] - fn guard_expression_parses_without_braces() { - let observed = Arc::new(AtomicUsize::new(0)); - - let value = 3usize; - let observed_clone = observed.clone(); - let guard = crate::guard!([value] observed_clone.store(value, Ordering::SeqCst)); - guard.trigger(); - - assert_eq!(observed.load(Ordering::SeqCst), 3); - } - - #[test] - fn defer_alias_behaves_like_guarded_statement() { - let called = Arc::new(AtomicUsize::new(0)); - - { - let n = 42usize; - let called = called.clone(); - crate::defer!([n] { - called.store(n, Ordering::SeqCst); - }); - } - - assert_eq!(called.load(Ordering::SeqCst), 42); - } - - #[tokio::test] - async fn guard_and_guarded_macro_usage_matrix() { - // 1) guard!: block body + trailing comma args + trigger() - let sink = Arc::new(AtomicUsize::new(0)); - let v = 1usize; - let sink_clone = sink.clone(); - let g1 = crate::guard!([v,] { - sink_clone.store(v, Ordering::SeqCst); - }); - g1.trigger(); - assert_eq!(sink.load(Ordering::SeqCst), 1); - - // 2) guard!: expression body (no braces) - let sink = Arc::new(AtomicUsize::new(0)); - let sink_clone = sink.clone(); - let v = 2usize; - let g2 = crate::guard!([v] sink_clone.store(v, Ordering::SeqCst)); - g2.trigger(); - assert_eq!(sink.load(Ordering::SeqCst), 2); - - // 3) guard!: explicit sync + no args form - let sink = Arc::new(AtomicUsize::new(0)); - let sink_clone = sink.clone(); - let g3 = crate::guard!(sync { - sink_clone.store(3, Ordering::SeqCst); - }); - g3.trigger(); - assert_eq!(sink.load(Ordering::SeqCst), 3); - - // 4) guard!: move capture + defuse() prevents execution - let sink = Arc::new(AtomicUsize::new(0)); - let owned = String::from("owned"); - let sink_clone = sink.clone(); - let g4 = crate::guard!(move [owned] { - if owned == "owned" { - sink_clone.store(4, Ordering::SeqCst); - } - }); - let context = g4.defuse(); - assert_eq!(context, "owned"); - assert_eq!(sink.load(Ordering::SeqCst), 0); - - // 5) guard!: async block inference + trigger() returns task - let sink = Arc::new(AtomicUsize::new(0)); - let sink_clone = sink.clone(); - let n = 5usize; - let g5 = crate::guard!([n] async move { - sink_clone.fetch_add(n, Ordering::SeqCst); - }); - g5.trigger().await; - assert_eq!(sink.load(Ordering::SeqCst), 5); - - // 6) guarded!: named binding + mut arg visible outside + explicit drop - let sink = Arc::new(AtomicUsize::new(0)); - { - let value = 6usize; - let delta = 1usize; - let sink_clone = sink.clone(); - - crate::guarded!(named => [mut value, delta] { - sink_clone.store(value + delta, Ordering::SeqCst); - }); - - *value += 10; - assert_eq!(*value, 16); - assert_eq!(*delta, 1); - drop(named); - } - assert_eq!(sink.load(Ordering::SeqCst), 17); - - // 7) guarded!: unnamed statement + expression body + implicit drop at scope end - let sink = Arc::new(AtomicUsize::new(0)); - { - let n = 7usize; - let sink_clone = sink.clone(); - crate::guarded!([n] sink_clone.store(n, Ordering::SeqCst)); - } - assert_eq!(sink.load(Ordering::SeqCst), 7); - - // 8) guarded!: explicit sync + panic path propagates on drop - let dropped = catch_unwind(|| { - guarded! { - sync { - panic!("matrix-boom"); - } - } - }); - assert!(dropped.is_err()); - - // 9) guarded!: async inference on drop detaches and executes - let (tx, rx) = oneshot::channel(); - { - let tx = Some(tx); - crate::guarded!([mut tx] { - let tx = tx.take(); - async move { - if let Some(tx) = tx { - let _ = tx.send(9usize); - } - } - }); - } - let detached = tokio::time::timeout(Duration::from_secs(1), rx) - .await - .expect("detached task should complete") - .expect("detached task should send value"); - assert_eq!(detached, 9); - } -} diff --git a/easytier/src/utils/mod.rs b/easytier/src/utils/mod.rs index 192dd9b9..1339c10d 100644 --- a/easytier/src/utils/mod.rs +++ b/easytier/src/utils/mod.rs @@ -1,4 +1,3 @@ -pub mod guard; pub mod panic; pub mod string; pub mod task; diff --git a/easytier/src/utils/task.rs b/easytier/src/utils/task.rs index dcad7933..b0ac0b40 100644 --- a/easytier/src/utils/task.rs +++ b/easytier/src/utils/task.rs @@ -1,7 +1,5 @@ -use crate::utils::guard::ContextGuard; use std::future::Future; use std::io; -use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -80,284 +78,3 @@ impl Future for CancellableTask { } // endregion - -// region DetachableTask - -/// A pinned, heap-allocated task. -/// -/// **Why Box?** Heap allocation is required because if the task detaches, -/// it outlives the current stack frame. `Pin>` ensures its memory address -/// remains completely stable during and after the transfer. -type BoxTask = Pin>; - -struct DetachableTaskContext { - spawner: Spawner, - task: Option>, -} -type DetachableTaskGuardHelper = ContextGuard; -type DetachableTaskGuard = - DetachableTaskGuardHelper>; - -/// A task wrapper that executes inline but automatically detaches to a background spawner -/// if the current execution context is interrupted or dropped. -/// -/// `DetachableTask` ensures anti-cancellation. If the outer future is dropped (e.g., due to -/// a timeout or a `select!` branch failing), the underlying unfinished task is seamlessly -/// transferred to a background executor via an RAII guard. -/// -/// # Advantages over `tokio::spawn` + `.await JoinHandle` -/// -/// 1. **Zero Initial Scheduling Overhead**: Prioritizes inline execution. If the task -/// completes before being interrupted, it entirely bypasses the runtime's scheduling queue, -/// eliminating queuing latency and context-switching CPU costs. Spawning is strictly a fallback. -/// -/// 2. **Context Locality**: Before detachment, the task is polled directly by the caller's thread. -/// This implicitly preserves the current execution context, including thread-local storage (TLS), -/// Tokio `task_local!` variables, and `tracing` spans, which would otherwise be immediately -/// lost or require explicit propagation across task boundaries. -pub struct DetachableTask { - guard: DetachableTaskGuard, -} - -impl DetachableTask { - pub fn detach(self) { - self.guard.trigger() - } - - pub fn reclaim(self) -> BoxTask { - self.guard.defuse().task.unwrap() - } -} - -pub type TaskSpawner::Output>> = fn(BoxTask) -> R; - -impl DetachableTask { - pub fn with_spawner( - spawner: Spawner, - task: Task, - ) -> DetachableTask - where - Spawner: FnOnce(BoxTask) -> _R, - { - let context = DetachableTaskContext { - spawner, - task: Some(Box::pin(task)), - }; - DetachableTask { - guard: crate::guard!([context] if let Some(task) = context.task { - (context.spawner)(task); - }), - } - } - - pub fn new(task: Task) -> DetachableTask, Task> - where - Task: Future + Send + 'static, - ::Output: Send + 'static, - { - Self::with_spawner(|task| tokio::runtime::Handle::current().spawn(task), task) - } -} - -impl) -> _R, _R, Task> IntoFuture for DetachableTask -where - Task: Future, -{ - type Output = Task::Output; - type IntoFuture = DetachableTaskFuture; - - fn into_future(self) -> Self::IntoFuture { - DetachableTaskFuture { guard: self.guard } - } -} - -pub struct DetachableTaskFuture { - guard: DetachableTaskGuard, -} - -impl) -> _R, _R, Task> Future for DetachableTaskFuture -where - Task: Future, -{ - type Output = Task::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // SAFETY: - // 1. We only access the outer struct's unpinned fields. - // 2. The inner task remains securely pinned on the heap via `BoxTask`. - // 3. We never expose a mutable, unpinned reference to the underlying task. - let this = unsafe { self.get_unchecked_mut() }; - let context = this.guard.deref_mut(); - let mut task = context.task.take().expect("polled after completion"); - let poll = task.as_mut().poll(cx); - if poll.is_pending() { - context.task = Some(task); - } - poll - } -} - -// endregion - -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::time::Duration; - use tokio::sync::{mpsc, oneshot}; - - #[tokio::test] - async fn spawn_when_dropped() { - let spawned = Arc::new(AtomicBool::new(false)); - { - let spawned = spawned.clone(); - let _task = DetachableTask::new(async move { - spawned.store(true, Ordering::SeqCst); - }); - } - - tokio::time::timeout(Duration::from_secs(1), async { - while !spawned.load(Ordering::SeqCst) { - tokio::task::yield_now().await; - } - }) - .await - .expect("task should be spawned on drop"); - } - - #[tokio::test] - async fn await_completed_task_does_not_detach() { - let spawn_count = Arc::new(AtomicUsize::new(0)); - let result = { - let spawn_count = spawn_count.clone(); - DetachableTask::with_spawner( - move |_| { - spawn_count.fetch_add(1, Ordering::SeqCst); - }, - async { 7usize }, - ) - .await - }; - - assert_eq!(result, 7); - assert_eq!(spawn_count.load(Ordering::SeqCst), 0); - } - - #[tokio::test] - async fn drop_without_await_and_runs_once() { - let spawn_count = Arc::new(AtomicUsize::new(0)); - let (done_tx, done_rx) = oneshot::channel(); - - { - let spawn_count = spawn_count.clone(); - let _task = DetachableTask::with_spawner( - move |f| { - spawn_count.fetch_add(1, Ordering::SeqCst); - tokio::spawn(async move { - let result = f.await; - let _ = done_tx.send(result); - }); - }, - async { 42usize }, - ); - } - - let detached_result = tokio::time::timeout(Duration::from_secs(1), done_rx) - .await - .expect("detached task should finish") - .expect("detached task should send result"); - - assert_eq!(detached_result, 42); - assert_eq!(spawn_count.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn drop_after_await_still_detaches() { - let spawn_count = Arc::new(AtomicUsize::new(0)); - let (value_tx, mut value_rx) = mpsc::channel(4); - let (done_tx, done_rx) = oneshot::channel(); - - let handle = { - let future = async move { - let mut sum = 0; - while let Some(value) = value_rx.recv().await { - sum += value; - } - sum - }; - - let spawn_count = spawn_count.clone(); - let task = DetachableTask::with_spawner( - move |f| { - spawn_count.fetch_add(1, Ordering::SeqCst); - tokio::spawn(async move { - let result = f.await; - let _ = done_tx.send(result); - }); - }, - future, - ); - - tokio::spawn(task.into_future()) - }; - - value_tx - .send(10) - .await - .expect("value receiver should still exist"); - handle.abort(); - value_tx - .send(11) - .await - .expect("value receiver should still exist"); - drop(value_tx); - - let detached_result = tokio::time::timeout(Duration::from_secs(1), done_rx) - .await - .expect("detached polled task should finish") - .expect("detached polled task should send result"); - - assert_eq!(detached_result, 21); - assert_eq!(spawn_count.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn panic_during_inline_poll_does_not_detach_on_drop() { - struct PanicOnPollFuture { - poll_count: Arc, - } - - impl Future for PanicOnPollFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - self.poll_count.fetch_add(1, Ordering::SeqCst); - panic!("panic during inline poll") - } - } - - let poll_count = Arc::new(AtomicUsize::new(0)); - let detach_count = Arc::new(AtomicUsize::new(0)); - - let task = { - let detach_count = detach_count.clone(); - DetachableTask::with_spawner( - move |_| { - detach_count.fetch_add(1, Ordering::SeqCst); - }, - PanicOnPollFuture { - poll_count: poll_count.clone(), - }, - ) - }; - - let err = tokio::spawn(task.into_future()) - .await - .expect_err("inline poll panic should propagate"); - - assert!(err.is_panic()); - assert_eq!(poll_count.load(Ordering::SeqCst), 1); - assert_eq!(detach_count.load(Ordering::SeqCst), 0); - } -}