mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-06 17:59:11 +00:00
utils: replace defer, ContextGuard, DetachableTask with guarden crate (#2163)
This commit is contained in:
Generated
+39
-22
@@ -2273,6 +2273,7 @@ dependencies = [
|
|||||||
"gethostname 0.5.0",
|
"gethostname 0.5.0",
|
||||||
"git-version",
|
"git-version",
|
||||||
"globwalk",
|
"globwalk",
|
||||||
|
"guarden",
|
||||||
"hickory-client",
|
"hickory-client",
|
||||||
"hickory-proto",
|
"hickory-proto",
|
||||||
"hickory-resolver",
|
"hickory-resolver",
|
||||||
@@ -2456,6 +2457,7 @@ dependencies = [
|
|||||||
"dashmap",
|
"dashmap",
|
||||||
"easytier",
|
"easytier",
|
||||||
"futures",
|
"futures",
|
||||||
|
"guarden",
|
||||||
"jsonwebtoken",
|
"jsonwebtoken",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
"mockall",
|
"mockall",
|
||||||
@@ -3590,6 +3592,28 @@ dependencies = [
|
|||||||
"syn 2.0.117",
|
"syn 2.0.117",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "guarden"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ca87812d87fa82896df1adfb5c111cdeaae3edb6da028f5df002dcbd7df71454"
|
||||||
|
dependencies = [
|
||||||
|
"futures",
|
||||||
|
"guarden-macros",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "guarden-macros"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8b42f4b8de91cbd793ce8e6cf8d4821ef02d2d5b4468e0a55a36c65c5581de53"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.117",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "h2"
|
name = "h2"
|
||||||
version = "0.4.7"
|
version = "0.4.7"
|
||||||
@@ -3705,12 +3729,6 @@ version = "0.5.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hermit-abi"
|
|
||||||
version = "0.3.9"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.5.2"
|
version = "0.5.2"
|
||||||
@@ -4026,7 +4044,7 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2 0.6.1",
|
"socket2 0.5.10",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -4695,9 +4713,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.172"
|
version = "0.2.186"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libdbus-sys"
|
name = "libdbus-sys"
|
||||||
@@ -5043,14 +5061,13 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "1.0.2"
|
version = "1.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
|
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"hermit-abi 0.3.9",
|
|
||||||
"libc",
|
"libc",
|
||||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -6551,7 +6568,7 @@ checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"concurrent-queue",
|
"concurrent-queue",
|
||||||
"hermit-abi 0.5.2",
|
"hermit-abi",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustix 1.0.7",
|
"rustix 1.0.7",
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
@@ -8650,12 +8667,12 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.6.1"
|
version = "0.6.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
|
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"windows-sys 0.60.2",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -9774,9 +9791,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.48.0"
|
version = "1.52.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
|
checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"libc",
|
"libc",
|
||||||
@@ -9784,7 +9801,7 @@ dependencies = [
|
|||||||
"parking_lot",
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2 0.6.1",
|
"socket2 0.6.3",
|
||||||
"tokio-macros",
|
"tokio-macros",
|
||||||
"tracing",
|
"tracing",
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
@@ -9792,9 +9809,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-macros"
|
name = "tokio-macros"
|
||||||
version = "2.6.0"
|
version = "2.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
|
checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
|
|||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
uuid = { version = "1.0", features = ["v4", "serde"] }
|
uuid = { version = "1.0", features = ["v4", "serde"] }
|
||||||
|
guarden = "0.1"
|
||||||
|
|
||||||
# Axum web framework
|
# Axum web framework
|
||||||
axum = { version = "0.8.4", features = ["macros"] }
|
axum = { version = "0.8.4", features = ["macros"] }
|
||||||
|
|||||||
@@ -10,9 +10,9 @@ use easytier::{
|
|||||||
common::config::{
|
common::config::{
|
||||||
ConfigFileControl, ConfigLoader, NetworkIdentity, PeerConfig, TomlConfigLoader,
|
ConfigFileControl, ConfigLoader, NetworkIdentity, PeerConfig, TomlConfigLoader,
|
||||||
},
|
},
|
||||||
defer,
|
|
||||||
instance_manager::NetworkInstanceManager,
|
instance_manager::NetworkInstanceManager,
|
||||||
};
|
};
|
||||||
|
use guarden::defer;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::any;
|
use sqlx::any;
|
||||||
use tokio_util::task::AbortOnDropHandle;
|
use tokio_util::task::AbortOnDropHandle;
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ time = "0.3"
|
|||||||
toml = "0.8.12"
|
toml = "0.8.12"
|
||||||
chrono = { version = "0.4.37", features = ["serde"] }
|
chrono = { version = "0.4.37", features = ["serde"] }
|
||||||
|
|
||||||
|
guarden = "0.1"
|
||||||
|
|
||||||
delegate = "0.13.5"
|
delegate = "0.13.5"
|
||||||
|
|
||||||
itertools = "0.14.0"
|
itertools = "0.14.0"
|
||||||
|
|||||||
@@ -121,9 +121,8 @@ pub async fn socket_addrs(
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::defer;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use guarden::defer;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_socket_addrs() {
|
async fn test_socket_addrs() {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use std::{
|
|||||||
|
|
||||||
use crossbeam::atomic::AtomicCell;
|
use crossbeam::atomic::AtomicCell;
|
||||||
use dashmap::{DashMap, DashSet};
|
use dashmap::{DashMap, DashSet};
|
||||||
|
use guarden::defer;
|
||||||
use rand::seq::SliceRandom as _;
|
use rand::seq::SliceRandom as _;
|
||||||
use tokio::{net::UdpSocket, sync::Mutex, task::JoinSet};
|
use tokio::{net::UdpSocket, sync::Mutex, task::JoinSet};
|
||||||
use tracing::{Instrument, Level, instrument};
|
use tracing::{Instrument, Level, instrument};
|
||||||
@@ -15,7 +16,6 @@ use crate::{
|
|||||||
common::{
|
common::{
|
||||||
PeerId, error::Error, global_ctx::ArcGlobalCtx, join_joinset_background, netns::NetNS, upnp,
|
PeerId, error::Error, global_ctx::ArcGlobalCtx, join_joinset_background, netns::NetNS, upnp,
|
||||||
},
|
},
|
||||||
defer,
|
|
||||||
peers::peer_manager::PeerManager,
|
peers::peer_manager::PeerManager,
|
||||||
proto::common::NatType,
|
proto::common::NatType,
|
||||||
tunnel::{
|
tunnel::{
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
use guarden::defer;
|
||||||
use rand::{Rng, seq::SliceRandom};
|
use rand::{Rng, seq::SliceRandom};
|
||||||
use tokio::{net::UdpSocket, sync::RwLock};
|
use tokio::{net::UdpSocket, sync::RwLock};
|
||||||
use tokio_util::task::AbortOnDropHandle;
|
use tokio_util::task::AbortOnDropHandle;
|
||||||
@@ -22,7 +23,6 @@ use crate::{
|
|||||||
},
|
},
|
||||||
handle_rpc_result,
|
handle_rpc_result,
|
||||||
},
|
},
|
||||||
defer,
|
|
||||||
peers::peer_manager::PeerManager,
|
peers::peer_manager::PeerManager,
|
||||||
proto::{
|
proto::{
|
||||||
peer_rpc::{
|
peer_rpc::{
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ use crate::{
|
|||||||
constants::EASYTIER_VERSION,
|
constants::EASYTIER_VERSION,
|
||||||
log,
|
log,
|
||||||
},
|
},
|
||||||
defer,
|
|
||||||
instance_manager::NetworkInstanceManager,
|
instance_manager::NetworkInstanceManager,
|
||||||
launcher::add_proxy_network_to_config,
|
launcher::add_proxy_network_to_config,
|
||||||
proto::common::{CompressionAlgoPb, SecureModeConfig},
|
proto::common::{CompressionAlgoPb, SecureModeConfig},
|
||||||
@@ -23,6 +22,7 @@ use crate::{
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use cidr::IpCidr;
|
use cidr::IpCidr;
|
||||||
use clap::{CommandFactory, Parser};
|
use clap::{CommandFactory, Parser};
|
||||||
|
use guarden::defer;
|
||||||
use rust_i18n::t;
|
use rust_i18n::t;
|
||||||
use std::{
|
use std::{
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use std::{
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
use guarden::defer;
|
||||||
use kcp_sys::{
|
use kcp_sys::{
|
||||||
endpoint::{ConnId, KcpEndpoint, KcpPacketReceiver},
|
endpoint::{ConnId, KcpEndpoint, KcpPacketReceiver},
|
||||||
ffi_safe::KcpConfig,
|
ffi_safe::KcpConfig,
|
||||||
@@ -359,7 +360,7 @@ impl KcpProxyDst {
|
|||||||
transport_type: TcpProxyEntryTransportType::Kcp.into(),
|
transport_type: TcpProxyEntryTransportType::Kcp.into(),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
crate::defer! {
|
defer! {
|
||||||
proxy_entries.remove(&conn_id);
|
proxy_entries.remove(&conn_id);
|
||||||
if proxy_entries.capacity() - proxy_entries.len() > 16 {
|
if proxy_entries.capacity() - proxy_entries.len() > 16 {
|
||||||
proxy_entries.shrink_to_fit();
|
proxy_entries.shrink_to_fit();
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut};
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use derivative::Derivative;
|
use derivative::Derivative;
|
||||||
use derive_more::{Constructor, Deref, DerefMut, From, Into};
|
use derive_more::{Constructor, Deref, DerefMut, From, Into};
|
||||||
|
use guarden::defer;
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use quinn::udp::{EcnCodepoint, RecvMeta, Transmit};
|
use quinn::udp::{EcnCodepoint, RecvMeta, Transmit};
|
||||||
use quinn::{
|
use quinn::{
|
||||||
@@ -662,7 +663,7 @@ impl QuicStreamReceiver {
|
|||||||
transport_type: TcpProxyEntryTransportType::Quic.into(),
|
transport_type: TcpProxyEntryTransportType::Quic.into(),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
crate::defer! {
|
defer! {
|
||||||
proxy_entries.remove(&handle);
|
proxy_entries.remove(&handle);
|
||||||
if proxy_entries.capacity() - proxy_entries.len() > 16 {
|
if proxy_entries.capacity() - proxy_entries.len() > 16 {
|
||||||
proxy_entries.shrink_to_fit();
|
proxy_entries.shrink_to_fit();
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
// translated from tailscale #32ce1bdb48078ec4cedaeeb5b1b2ff9c0ef61a49
|
// translated from tailscale #32ce1bdb48078ec4cedaeeb5b1b2ff9c0ef61a49
|
||||||
|
|
||||||
use crate::defer;
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use dbus::blocking::stdintf::org_freedesktop_dbus::Properties as _;
|
use dbus::blocking::stdintf::org_freedesktop_dbus::Properties as _;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
@@ -167,6 +166,7 @@ fn new_os_configurator(_interface_name: String) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use guarden::defer;
|
||||||
use std::io::{self, BufRead, Cursor};
|
use std::io::{self, BufRead, Cursor};
|
||||||
|
|
||||||
/// 返回 `resolv.conf` 内容的拥有者("systemd-resolved"、"NetworkManager"、"resolvconf" 或空字符串)
|
/// 返回 `resolv.conf` 内容的拥有者("systemd-resolved"、"NetworkManager"、"resolvconf" 或空字符串)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ use std::{
|
|||||||
|
|
||||||
use base64::Engine as _;
|
use base64::Engine as _;
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||||
|
use guarden::guard;
|
||||||
use hmac::Mac;
|
use hmac::Mac;
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
|
|
||||||
@@ -40,7 +41,6 @@ use crate::{
|
|||||||
error::Error,
|
error::Error,
|
||||||
global_ctx::ArcGlobalCtx,
|
global_ctx::ArcGlobalCtx,
|
||||||
},
|
},
|
||||||
guard,
|
|
||||||
peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn},
|
peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn},
|
||||||
proto::{
|
proto::{
|
||||||
api::instance::{PeerConnInfo, PeerConnStats},
|
api::instance::{PeerConnInfo, PeerConnStats},
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
use std::sync::{Arc, Mutex, atomic::AtomicBool};
|
use std::sync::{Arc, Mutex, atomic::AtomicBool};
|
||||||
|
|
||||||
use futures::{SinkExt as _, StreamExt};
|
use futures::{SinkExt as _, StreamExt};
|
||||||
|
use guarden::defer;
|
||||||
use tokio::{task::JoinSet, time::timeout};
|
use tokio::{task::JoinSet, time::timeout};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
defer,
|
|
||||||
proto::rpc_types::error::Error,
|
proto::rpc_types::error::Error,
|
||||||
tunnel::{Tunnel, packet_def::PacketType, ring::create_ring_tunnel_pair},
|
tunnel::{Tunnel, packet_def::PacketType, ring::create_ring_tunnel_pair},
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -4,18 +4,17 @@ use std::sync::{Arc, Mutex};
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
use guarden::defer;
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
|
|
||||||
use crate::common::shrink_dashmap;
|
|
||||||
use crate::common::{
|
use crate::common::{
|
||||||
PeerId,
|
PeerId, shrink_dashmap,
|
||||||
stats_manager::{LabelSet, LabelType, MetricName, StatsManager},
|
stats_manager::{LabelSet, LabelType, MetricName, StatsManager},
|
||||||
};
|
};
|
||||||
use crate::defer;
|
|
||||||
use crate::proto::common::{
|
use crate::proto::common::{
|
||||||
CompressionAlgoPb, RpcCompressionInfo, RpcDescriptor, RpcPacket, RpcRequest, RpcResponse,
|
CompressionAlgoPb, RpcCompressionInfo, RpcDescriptor, RpcPacket, RpcRequest, RpcResponse,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,638 +0,0 @@
|
|||||||
//! # Guard Module Utilities
|
|
||||||
//!
|
|
||||||
//! This module provides mechanisms for scope-based resource management and deferred execution.
|
|
||||||
//!
|
|
||||||
//! ### ⚠️ Critical Usage Note: Diverging Expressions
|
|
||||||
//!
|
|
||||||
//! Do not use "naked" diverging expressions—such as `panic!`, `todo!`, or `loop {}`—as
|
|
||||||
//! the sole content of sync guard closure. This prevents the compiler from
|
|
||||||
//! distinguishing between synchronous (`ASYNC = false`) and asynchronous
|
|
||||||
//! (`ASYNC = true`) implementations, leading to a type inference error (E0277).
|
|
||||||
//!
|
|
||||||
//! ### Technical Context
|
|
||||||
//!
|
|
||||||
//! The `!` (Never Type) is a bottom type that can be coerced into any other type.
|
|
||||||
//! Because it satisfies both the `()` requirement for sync guards and the `Future`
|
|
||||||
//! requirement for async guards, the compiler encounters an inference deadlock.
|
|
||||||
//!
|
|
||||||
//! ### Workaround
|
|
||||||
//!
|
|
||||||
//! For macros like `guard!` or `guarded!`, force the closure to resolve to `()`
|
|
||||||
//! by explicitly setting the guard to `sync`:
|
|
||||||
//!
|
|
||||||
//! ```rust
|
|
||||||
//! let _g = guard!([val] sync {
|
|
||||||
//! panic!("critical failure");
|
|
||||||
//! });
|
|
||||||
//! ```
|
|
||||||
|
|
||||||
use crate::utils::task::{DetachableTask, TaskSpawner};
|
|
||||||
use std::fmt::Debug;
|
|
||||||
use std::mem::ManuallyDrop;
|
|
||||||
use std::ops::{Deref, DerefMut};
|
|
||||||
|
|
||||||
pub trait CallableGuard<const ASYNC: bool, Context> {
|
|
||||||
type Output;
|
|
||||||
fn call(self, context: Context) -> Self::Output;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Context, Guard> CallableGuard<false, Context> for Guard
|
|
||||||
where
|
|
||||||
Guard: FnOnce(Context),
|
|
||||||
{
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn call(self, context: Context) -> Self::Output {
|
|
||||||
self(context)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Context, Guard, Task, _R> CallableGuard<true, Context> for Guard
|
|
||||||
where
|
|
||||||
Guard: FnOnce(Context) -> Task + Send + 'static,
|
|
||||||
Task: Future<Output = _R> + Send + 'static,
|
|
||||||
_R: Send + 'static,
|
|
||||||
{
|
|
||||||
type Output = DetachableTask<TaskSpawner<Task>, Task>;
|
|
||||||
|
|
||||||
fn call(self, context: Context) -> Self::Output {
|
|
||||||
DetachableTask::new(self(context))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ContextGuard<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>> {
|
|
||||||
context: ManuallyDrop<Context>,
|
|
||||||
guard: ManuallyDrop<Guard>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>> Deref
|
|
||||||
for ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
type Target = Context;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.context
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>> DerefMut
|
|
||||||
for ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.context
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context: Debug, Guard: CallableGuard<ASYNC, Context>> Debug
|
|
||||||
for ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
let name = if ASYNC {
|
|
||||||
"ContextGuard::Async"
|
|
||||||
} else {
|
|
||||||
"ContextGuard::Sync"
|
|
||||||
};
|
|
||||||
f.debug_struct(name)
|
|
||||||
.field("context", &self.context)
|
|
||||||
.finish_non_exhaustive()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>>
|
|
||||||
ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
/// Creates a new `ContextGuard`.
|
|
||||||
///
|
|
||||||
/// **Note on generics:** The seemingly unused `_R` generic parameter and the
|
|
||||||
/// `Guard: FnOnce(Context) -> _R` trait bound are intentionally included.
|
|
||||||
/// They act as a hint to help the compiler infer closure types.
|
|
||||||
pub fn new<_R>(context: Context, guard: Guard) -> Self
|
|
||||||
where
|
|
||||||
Guard: FnOnce(Context) -> _R,
|
|
||||||
{
|
|
||||||
ContextGuard {
|
|
||||||
context: ManuallyDrop::new(context),
|
|
||||||
guard: ManuallyDrop::new(guard),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>>
|
|
||||||
ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
unsafe fn call(&mut self) -> Guard::Output {
|
|
||||||
unsafe {
|
|
||||||
let context = ManuallyDrop::take(&mut self.context);
|
|
||||||
let guard = ManuallyDrop::take(&mut self.guard);
|
|
||||||
|
|
||||||
guard.call(context)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn trigger(self) -> Guard::Output {
|
|
||||||
let mut this = ManuallyDrop::new(self);
|
|
||||||
unsafe { this.call() }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn defuse(self) -> Context {
|
|
||||||
let mut this = ManuallyDrop::new(self);
|
|
||||||
unsafe {
|
|
||||||
ManuallyDrop::drop(&mut this.guard);
|
|
||||||
ManuallyDrop::take(&mut this.context)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<const ASYNC: bool, Context, Guard: CallableGuard<ASYNC, Context>> Drop
|
|
||||||
for ContextGuard<ASYNC, Context, Guard>
|
|
||||||
{
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let _: Guard::Output = unsafe { self.call() };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// region macro
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! __guarded {
|
|
||||||
(@parse@action $guard:ident => $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse@async action: [ @stmt $guard ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(@parse@action $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse@async action: [ @stmt __guard ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(@parse@async action: [ $($action:tt)* ] ; sync $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse@move action: [ $($action)* ] ; async: [ false ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(@parse@async action: [ $($action:tt)* ] ; $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse@move action: [ $($action)* ] ; async: [ _ ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(@parse@move action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse action: [ $($action)* ] ; async: [ $async ] ; move: [ move ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(@parse@move action: [ $($action:tt)* ] ; async: [ $async:tt ] ; $($tt:tt)*) => {
|
|
||||||
$crate::__guarded! { @parse action: [ $($action)* ] ; async: [ $async ] ; move: [] ; $($tt)* }
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
@parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ;
|
|
||||||
[ $($args:tt)* ] $body:block
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
action: [ $($action)* ]
|
|
||||||
async: [ $async ]
|
|
||||||
move: [ $($move)? ]
|
|
||||||
mut: []
|
|
||||||
rest: [ $($args)* , ]
|
|
||||||
args: []
|
|
||||||
vars: []
|
|
||||||
body: [ $body ]
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
@parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ;
|
|
||||||
$body:block
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
@parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ;
|
|
||||||
[] $body
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
@parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ;
|
|
||||||
[ $($args:tt)* ] $($body:tt)*
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
@parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ;
|
|
||||||
[ $($args)* ] { $($body)* }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
@parse action: [ $($action:tt)* ] ; async: [ $async:tt ] ; move: [ $($move:tt)? ] ;
|
|
||||||
$($body:tt)*
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
@parse action: [ $($action)* ] ; async: [ $async ] ; move: [ $($move)? ] ;
|
|
||||||
[] { $($body)* }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
action: [ $($action:tt)* ]
|
|
||||||
async: [ $async:tt ]
|
|
||||||
move: [ $($move:tt)? ]
|
|
||||||
mut: [ $($mut:tt)? ]
|
|
||||||
rest: [ mut $arg:ident , $($rest:tt)* ]
|
|
||||||
args: [ $($args:ident)* ]
|
|
||||||
vars: [ $($vars:tt)* ]
|
|
||||||
body: [ $body:expr ]
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
action: [ $($action)* ]
|
|
||||||
async: [ $async ]
|
|
||||||
move: [ $($move)? ]
|
|
||||||
mut: [ mut ]
|
|
||||||
rest: [ $($rest)* ]
|
|
||||||
args: [ $($args)* $arg ]
|
|
||||||
vars: [ $($vars)* [mut $arg] ]
|
|
||||||
body: [ $body ]
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
action: [ $($action:tt)* ]
|
|
||||||
async: [ $async:tt ]
|
|
||||||
move: [ $($move:tt)? ]
|
|
||||||
mut: [ $($mut:tt)? ]
|
|
||||||
rest: [ $arg:ident , $($rest:tt)* ]
|
|
||||||
args: [ $($args:ident)* ]
|
|
||||||
vars: [ $($vars:tt)* ]
|
|
||||||
body: [ $body:expr ]
|
|
||||||
) => {
|
|
||||||
$crate::__guarded! {
|
|
||||||
action: [ $($action)* ]
|
|
||||||
async: [ $async ]
|
|
||||||
move: [ $($move)? ]
|
|
||||||
mut: [ $($mut)? ]
|
|
||||||
rest: [ $($rest)* ]
|
|
||||||
args: [ $($args)* $arg ]
|
|
||||||
vars: [ $($vars)* [$arg] ]
|
|
||||||
body: [ $body ]
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
action: [ @stmt $guard:ident ]
|
|
||||||
async: [ $async:tt ]
|
|
||||||
move: [ $($move:tt)? ]
|
|
||||||
mut: [ $($mut:tt)? ]
|
|
||||||
rest: [ $(,)* ]
|
|
||||||
args: [ $($args:ident)* ]
|
|
||||||
vars: [ $([$($vars:tt)*])* ]
|
|
||||||
body: [ $body:expr ]
|
|
||||||
) => {
|
|
||||||
let $($mut)? $guard = $crate::utils::guard::ContextGuard::<$async, _, _>::new(
|
|
||||||
( $($args),* ),
|
|
||||||
$($move)? |#[allow(unused_parens, unused_mut)] ( $($($vars)*),* )| $body
|
|
||||||
);
|
|
||||||
|
|
||||||
#[allow(unused_parens, unused_variables, clippy::toplevel_ref_arg)]
|
|
||||||
let ( $(ref $($vars)*),* ) = *$guard;
|
|
||||||
};
|
|
||||||
|
|
||||||
(
|
|
||||||
action: [ @expr ]
|
|
||||||
async: [ $async:tt ]
|
|
||||||
move: [ $($move:tt)? ]
|
|
||||||
mut: [ $($mut:tt)? ]
|
|
||||||
rest: [ $(,)* ]
|
|
||||||
args: [ $($args:ident)* ]
|
|
||||||
vars: [ $([$($vars:tt)*])* ]
|
|
||||||
body: [ $body:expr ]
|
|
||||||
) => {
|
|
||||||
$crate::utils::guard::ContextGuard::<$async, _, _>::new(
|
|
||||||
( $($args),* ),
|
|
||||||
$($move)? |#[allow(unused_parens)] ( $($($vars)*),* )| $body
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a [`ContextGuard`] object, binding it to a variable with the specified name (e.g., `_guard`).
|
|
||||||
/// Context variables specified in the macro invocation are available within and after the guard body.
|
|
||||||
///
|
|
||||||
/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self)
|
|
||||||
/// regarding type inference deadlocks.
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! guarded {
|
|
||||||
( $($tt:tt)* ) => {
|
|
||||||
$crate::__guarded! { @parse@action $($tt)* }
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a [`ContextGuard`] object, without binding it to a variable.
|
|
||||||
/// Context variables specified in the macro invocation are available within the guard body.
|
|
||||||
///
|
|
||||||
/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self)
|
|
||||||
/// regarding type inference deadlocks.
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! guard {
|
|
||||||
( $($tt:tt)* ) => {
|
|
||||||
$crate::__guarded! { @parse@async action: [ @expr ] ; $($tt)* }
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
/// Alias for [`guarded!`].
|
|
||||||
///
|
|
||||||
/// **Note:** For usage with `panic!` or `loop`, see the [module-level documentation](self)
|
|
||||||
/// regarding type inference deadlocks.
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! defer {
|
|
||||||
( $($tt:tt)* ) => {
|
|
||||||
$crate::guarded! { $($tt)* }
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use std::panic::catch_unwind;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
||||||
use std::time::Duration;
|
|
||||||
use tokio::sync::oneshot;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn trigger_sync_executes_once() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
let observed = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let value = 7usize;
|
|
||||||
let guard = {
|
|
||||||
let called = called.clone();
|
|
||||||
let observed = observed.clone();
|
|
||||||
crate::guard!(move [value] {
|
|
||||||
called.fetch_add(1, Ordering::SeqCst);
|
|
||||||
observed.store(value, Ordering::SeqCst);
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
guard.trigger();
|
|
||||||
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 1);
|
|
||||||
assert_eq!(observed.load(Ordering::SeqCst), 7);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn defuse_sync_returns_context_without_running_guard() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let value = String::from("hello");
|
|
||||||
let guard = {
|
|
||||||
let called = called.clone();
|
|
||||||
crate::guard!(move [mut value] {
|
|
||||||
value.push_str(" world");
|
|
||||||
called.fetch_add(1, Ordering::SeqCst);
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let context = guard.defuse();
|
|
||||||
assert_eq!(context, "hello");
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn drop_sync_triggers_guard() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
{
|
|
||||||
let called = called.clone();
|
|
||||||
crate::guarded!([called] {
|
|
||||||
called.fetch_add(1, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn drop_propagates_guard_panic() {
|
|
||||||
let dropped = catch_unwind(|| {
|
|
||||||
guarded! {
|
|
||||||
sync {
|
|
||||||
panic!("boom");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
assert!(dropped.is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn trigger_async_returns_runnable_task() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let value = 5usize;
|
|
||||||
let guard = {
|
|
||||||
let called = called.clone();
|
|
||||||
crate::guard!(move [value] async move {
|
|
||||||
called.fetch_add(value, Ordering::SeqCst);
|
|
||||||
})
|
|
||||||
};
|
|
||||||
let task = guard.trigger();
|
|
||||||
task.await;
|
|
||||||
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 5);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn drop_async_detaches_task() {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut tx = Some(tx);
|
|
||||||
let value = 9usize;
|
|
||||||
let _guard = crate::guard!(move [value] {
|
|
||||||
let tx = tx.take();
|
|
||||||
async move {
|
|
||||||
if let Some(tx) = tx {
|
|
||||||
let _ = tx.send(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let value = tokio::time::timeout(Duration::from_secs(1), rx)
|
|
||||||
.await
|
|
||||||
.expect("detached task should run")
|
|
||||||
.expect("detached task should send value");
|
|
||||||
assert_eq!(value, 9);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn defuse_async_does_not_execute() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let value = 11usize;
|
|
||||||
let guard = {
|
|
||||||
let called = called.clone();
|
|
||||||
crate::guard!(move [value] async move {
|
|
||||||
called.fetch_add(value, Ordering::SeqCst);
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
let context = guard.defuse();
|
|
||||||
assert_eq!(context, 11);
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn guarded_named_mut_binding_updates_context_before_drop() {
|
|
||||||
let committed = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
{
|
|
||||||
let value = 1usize;
|
|
||||||
let step = 2usize;
|
|
||||||
let committed = committed.clone();
|
|
||||||
|
|
||||||
crate::guarded!(scope_guard => [mut value, step] {
|
|
||||||
committed.store(value + step, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
|
|
||||||
*value += 10;
|
|
||||||
assert_eq!(*value, 11);
|
|
||||||
assert_eq!(*step, 2);
|
|
||||||
|
|
||||||
drop(scope_guard);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(committed.load(Ordering::SeqCst), 13);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn guard_expression_parses_without_braces() {
|
|
||||||
let observed = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let value = 3usize;
|
|
||||||
let observed_clone = observed.clone();
|
|
||||||
let guard = crate::guard!([value] observed_clone.store(value, Ordering::SeqCst));
|
|
||||||
guard.trigger();
|
|
||||||
|
|
||||||
assert_eq!(observed.load(Ordering::SeqCst), 3);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn defer_alias_behaves_like_guarded_statement() {
|
|
||||||
let called = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
{
|
|
||||||
let n = 42usize;
|
|
||||||
let called = called.clone();
|
|
||||||
crate::defer!([n] {
|
|
||||||
called.store(n, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(called.load(Ordering::SeqCst), 42);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn guard_and_guarded_macro_usage_matrix() {
|
|
||||||
// 1) guard!: block body + trailing comma args + trigger()
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
let v = 1usize;
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
let g1 = crate::guard!([v,] {
|
|
||||||
sink_clone.store(v, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
g1.trigger();
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 1);
|
|
||||||
|
|
||||||
// 2) guard!: expression body (no braces)
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
let v = 2usize;
|
|
||||||
let g2 = crate::guard!([v] sink_clone.store(v, Ordering::SeqCst));
|
|
||||||
g2.trigger();
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 2);
|
|
||||||
|
|
||||||
// 3) guard!: explicit sync + no args form
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
let g3 = crate::guard!(sync {
|
|
||||||
sink_clone.store(3, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
g3.trigger();
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 3);
|
|
||||||
|
|
||||||
// 4) guard!: move capture + defuse() prevents execution
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
let owned = String::from("owned");
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
let g4 = crate::guard!(move [owned] {
|
|
||||||
if owned == "owned" {
|
|
||||||
sink_clone.store(4, Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let context = g4.defuse();
|
|
||||||
assert_eq!(context, "owned");
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 0);
|
|
||||||
|
|
||||||
// 5) guard!: async block inference + trigger() returns task
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
let n = 5usize;
|
|
||||||
let g5 = crate::guard!([n] async move {
|
|
||||||
sink_clone.fetch_add(n, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
g5.trigger().await;
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 5);
|
|
||||||
|
|
||||||
// 6) guarded!: named binding + mut arg visible outside + explicit drop
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
{
|
|
||||||
let value = 6usize;
|
|
||||||
let delta = 1usize;
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
|
|
||||||
crate::guarded!(named => [mut value, delta] {
|
|
||||||
sink_clone.store(value + delta, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
|
|
||||||
*value += 10;
|
|
||||||
assert_eq!(*value, 16);
|
|
||||||
assert_eq!(*delta, 1);
|
|
||||||
drop(named);
|
|
||||||
}
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 17);
|
|
||||||
|
|
||||||
// 7) guarded!: unnamed statement + expression body + implicit drop at scope end
|
|
||||||
let sink = Arc::new(AtomicUsize::new(0));
|
|
||||||
{
|
|
||||||
let n = 7usize;
|
|
||||||
let sink_clone = sink.clone();
|
|
||||||
crate::guarded!([n] sink_clone.store(n, Ordering::SeqCst));
|
|
||||||
}
|
|
||||||
assert_eq!(sink.load(Ordering::SeqCst), 7);
|
|
||||||
|
|
||||||
// 8) guarded!: explicit sync + panic path propagates on drop
|
|
||||||
let dropped = catch_unwind(|| {
|
|
||||||
guarded! {
|
|
||||||
sync {
|
|
||||||
panic!("matrix-boom");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(dropped.is_err());
|
|
||||||
|
|
||||||
// 9) guarded!: async inference on drop detaches and executes
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
{
|
|
||||||
let tx = Some(tx);
|
|
||||||
crate::guarded!([mut tx] {
|
|
||||||
let tx = tx.take();
|
|
||||||
async move {
|
|
||||||
if let Some(tx) = tx {
|
|
||||||
let _ = tx.send(9usize);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let detached = tokio::time::timeout(Duration::from_secs(1), rx)
|
|
||||||
.await
|
|
||||||
.expect("detached task should complete")
|
|
||||||
.expect("detached task should send value");
|
|
||||||
assert_eq!(detached, 9);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,3 @@
|
|||||||
pub mod guard;
|
|
||||||
pub mod panic;
|
pub mod panic;
|
||||||
pub mod string;
|
pub mod string;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
use crate::utils::guard::ContextGuard;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::ops::DerefMut;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -80,284 +78,3 @@ impl<Output> Future for CancellableTask<Output> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// endregion
|
// endregion
|
||||||
|
|
||||||
// region DetachableTask
|
|
||||||
|
|
||||||
/// A pinned, heap-allocated task.
|
|
||||||
///
|
|
||||||
/// **Why Box?** Heap allocation is required because if the task detaches,
|
|
||||||
/// it outlives the current stack frame. `Pin<Box<_>>` ensures its memory address
|
|
||||||
/// remains completely stable during and after the transfer.
|
|
||||||
type BoxTask<Task> = Pin<Box<Task>>;
|
|
||||||
|
|
||||||
struct DetachableTaskContext<Spawner, Task> {
|
|
||||||
spawner: Spawner,
|
|
||||||
task: Option<BoxTask<Task>>,
|
|
||||||
}
|
|
||||||
type DetachableTaskGuardHelper<Context> = ContextGuard<false, Context, fn(Context)>;
|
|
||||||
type DetachableTaskGuard<Spawner, Task> =
|
|
||||||
DetachableTaskGuardHelper<DetachableTaskContext<Spawner, Task>>;
|
|
||||||
|
|
||||||
/// A task wrapper that executes inline but automatically detaches to a background spawner
|
|
||||||
/// if the current execution context is interrupted or dropped.
|
|
||||||
///
|
|
||||||
/// `DetachableTask` ensures anti-cancellation. If the outer future is dropped (e.g., due to
|
|
||||||
/// a timeout or a `select!` branch failing), the underlying unfinished task is seamlessly
|
|
||||||
/// transferred to a background executor via an RAII guard.
|
|
||||||
///
|
|
||||||
/// # Advantages over `tokio::spawn` + `.await JoinHandle`
|
|
||||||
///
|
|
||||||
/// 1. **Zero Initial Scheduling Overhead**: Prioritizes inline execution. If the task
|
|
||||||
/// completes before being interrupted, it entirely bypasses the runtime's scheduling queue,
|
|
||||||
/// eliminating queuing latency and context-switching CPU costs. Spawning is strictly a fallback.
|
|
||||||
///
|
|
||||||
/// 2. **Context Locality**: Before detachment, the task is polled directly by the caller's thread.
|
|
||||||
/// This implicitly preserves the current execution context, including thread-local storage (TLS),
|
|
||||||
/// Tokio `task_local!` variables, and `tracing` spans, which would otherwise be immediately
|
|
||||||
/// lost or require explicit propagation across task boundaries.
|
|
||||||
pub struct DetachableTask<Spawner, Task> {
|
|
||||||
guard: DetachableTaskGuard<Spawner, Task>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Spawner, Task> DetachableTask<Spawner, Task> {
|
|
||||||
pub fn detach(self) {
|
|
||||||
self.guard.trigger()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reclaim(self) -> BoxTask<Task> {
|
|
||||||
self.guard.defuse().task.unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type TaskSpawner<Task, R = JoinHandle<<Task as Future>::Output>> = fn(BoxTask<Task>) -> R;
|
|
||||||
|
|
||||||
impl DetachableTask<fn(()), ()> {
|
|
||||||
pub fn with_spawner<Spawner, _R, Task>(
|
|
||||||
spawner: Spawner,
|
|
||||||
task: Task,
|
|
||||||
) -> DetachableTask<Spawner, Task>
|
|
||||||
where
|
|
||||||
Spawner: FnOnce(BoxTask<Task>) -> _R,
|
|
||||||
{
|
|
||||||
let context = DetachableTaskContext {
|
|
||||||
spawner,
|
|
||||||
task: Some(Box::pin(task)),
|
|
||||||
};
|
|
||||||
DetachableTask {
|
|
||||||
guard: crate::guard!([context] if let Some(task) = context.task {
|
|
||||||
(context.spawner)(task);
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new<Task>(task: Task) -> DetachableTask<TaskSpawner<Task>, Task>
|
|
||||||
where
|
|
||||||
Task: Future + Send + 'static,
|
|
||||||
<Task as Future>::Output: Send + 'static,
|
|
||||||
{
|
|
||||||
Self::with_spawner(|task| tokio::runtime::Handle::current().spawn(task), task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Spawner: FnOnce(BoxTask<Task>) -> _R, _R, Task> IntoFuture for DetachableTask<Spawner, Task>
|
|
||||||
where
|
|
||||||
Task: Future,
|
|
||||||
{
|
|
||||||
type Output = Task::Output;
|
|
||||||
type IntoFuture = DetachableTaskFuture<Spawner, Task>;
|
|
||||||
|
|
||||||
fn into_future(self) -> Self::IntoFuture {
|
|
||||||
DetachableTaskFuture { guard: self.guard }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DetachableTaskFuture<Spawner, Task> {
|
|
||||||
guard: DetachableTaskGuard<Spawner, Task>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Spawner: FnOnce(BoxTask<Task>) -> _R, _R, Task> Future for DetachableTaskFuture<Spawner, Task>
|
|
||||||
where
|
|
||||||
Task: Future,
|
|
||||||
{
|
|
||||||
type Output = Task::Output;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
// SAFETY:
|
|
||||||
// 1. We only access the outer struct's unpinned fields.
|
|
||||||
// 2. The inner task remains securely pinned on the heap via `BoxTask<Task>`.
|
|
||||||
// 3. We never expose a mutable, unpinned reference to the underlying task.
|
|
||||||
let this = unsafe { self.get_unchecked_mut() };
|
|
||||||
let context = this.guard.deref_mut();
|
|
||||||
let mut task = context.task.take().expect("polled after completion");
|
|
||||||
let poll = task.as_mut().poll(cx);
|
|
||||||
if poll.is_pending() {
|
|
||||||
context.task = Some(task);
|
|
||||||
}
|
|
||||||
poll
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
||||||
use std::time::Duration;
|
|
||||||
use tokio::sync::{mpsc, oneshot};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn spawn_when_dropped() {
|
|
||||||
let spawned = Arc::new(AtomicBool::new(false));
|
|
||||||
{
|
|
||||||
let spawned = spawned.clone();
|
|
||||||
let _task = DetachableTask::new(async move {
|
|
||||||
spawned.store(true, Ordering::SeqCst);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::timeout(Duration::from_secs(1), async {
|
|
||||||
while !spawned.load(Ordering::SeqCst) {
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("task should be spawned on drop");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn await_completed_task_does_not_detach() {
|
|
||||||
let spawn_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let result = {
|
|
||||||
let spawn_count = spawn_count.clone();
|
|
||||||
DetachableTask::with_spawner(
|
|
||||||
move |_| {
|
|
||||||
spawn_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
},
|
|
||||||
async { 7usize },
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
};
|
|
||||||
|
|
||||||
assert_eq!(result, 7);
|
|
||||||
assert_eq!(spawn_count.load(Ordering::SeqCst), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn drop_without_await_and_runs_once() {
|
|
||||||
let spawn_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let (done_tx, done_rx) = oneshot::channel();
|
|
||||||
|
|
||||||
{
|
|
||||||
let spawn_count = spawn_count.clone();
|
|
||||||
let _task = DetachableTask::with_spawner(
|
|
||||||
move |f| {
|
|
||||||
spawn_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let result = f.await;
|
|
||||||
let _ = done_tx.send(result);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
async { 42usize },
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let detached_result = tokio::time::timeout(Duration::from_secs(1), done_rx)
|
|
||||||
.await
|
|
||||||
.expect("detached task should finish")
|
|
||||||
.expect("detached task should send result");
|
|
||||||
|
|
||||||
assert_eq!(detached_result, 42);
|
|
||||||
assert_eq!(spawn_count.load(Ordering::SeqCst), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn drop_after_await_still_detaches() {
|
|
||||||
let spawn_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let (value_tx, mut value_rx) = mpsc::channel(4);
|
|
||||||
let (done_tx, done_rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let handle = {
|
|
||||||
let future = async move {
|
|
||||||
let mut sum = 0;
|
|
||||||
while let Some(value) = value_rx.recv().await {
|
|
||||||
sum += value;
|
|
||||||
}
|
|
||||||
sum
|
|
||||||
};
|
|
||||||
|
|
||||||
let spawn_count = spawn_count.clone();
|
|
||||||
let task = DetachableTask::with_spawner(
|
|
||||||
move |f| {
|
|
||||||
spawn_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let result = f.await;
|
|
||||||
let _ = done_tx.send(result);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
future,
|
|
||||||
);
|
|
||||||
|
|
||||||
tokio::spawn(task.into_future())
|
|
||||||
};
|
|
||||||
|
|
||||||
value_tx
|
|
||||||
.send(10)
|
|
||||||
.await
|
|
||||||
.expect("value receiver should still exist");
|
|
||||||
handle.abort();
|
|
||||||
value_tx
|
|
||||||
.send(11)
|
|
||||||
.await
|
|
||||||
.expect("value receiver should still exist");
|
|
||||||
drop(value_tx);
|
|
||||||
|
|
||||||
let detached_result = tokio::time::timeout(Duration::from_secs(1), done_rx)
|
|
||||||
.await
|
|
||||||
.expect("detached polled task should finish")
|
|
||||||
.expect("detached polled task should send result");
|
|
||||||
|
|
||||||
assert_eq!(detached_result, 21);
|
|
||||||
assert_eq!(spawn_count.load(Ordering::SeqCst), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn panic_during_inline_poll_does_not_detach_on_drop() {
|
|
||||||
struct PanicOnPollFuture {
|
|
||||||
poll_count: Arc<AtomicUsize>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for PanicOnPollFuture {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
self.poll_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
panic!("panic during inline poll")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let poll_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let detach_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let task = {
|
|
||||||
let detach_count = detach_count.clone();
|
|
||||||
DetachableTask::with_spawner(
|
|
||||||
move |_| {
|
|
||||||
detach_count.fetch_add(1, Ordering::SeqCst);
|
|
||||||
},
|
|
||||||
PanicOnPollFuture {
|
|
||||||
poll_count: poll_count.clone(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let err = tokio::spawn(task.into_future())
|
|
||||||
.await
|
|
||||||
.expect_err("inline poll panic should propagate");
|
|
||||||
|
|
||||||
assert!(err.is_panic());
|
|
||||||
assert_eq!(poll_count.load(Ordering::SeqCst), 1);
|
|
||||||
assert_eq!(detach_count.load(Ordering::SeqCst), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user