mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 02:09:06 +00:00
feat(web): implement secure core-web tunnel with Noise protocol (#1976)
Implement end-to-end encryption for core-web connections using the Noise protocol framework with the following changes: Client-side (easytier/src/web_client/): - Add security.rs module with Noise handshake implementation - Add upgrade_client_tunnel() for client-side handshake - Add Noise frame encryption/decryption via TunnelFilter - Integrate GetFeature RPC for capability negotiation - Support secure_mode option to enforce encrypted connections - Handle graceful fallback for backward compatibility Server-side (easytier-web/): - Accept Noise handshake in client_manager - Expose encryption support via GetFeature RPC The implementation uses Noise_NN_25519_ChaChaPoly_SHA256 pattern for encryption without authentication. Provides backward compatibility with automatic fallback to plaintext connections.
This commit is contained in:
@@ -472,11 +472,17 @@ async fn init_web_client(app: AppHandle, url: Option<String>) -> Result<(), Stri
|
|||||||
|
|
||||||
let hooks = Arc::new(manager::GuiHooks { app: app.clone() });
|
let hooks = Arc::new(manager::GuiHooks { app: app.clone() });
|
||||||
|
|
||||||
let web_client =
|
let web_client = web_client::run_web_client(
|
||||||
web_client::run_web_client(url.as_str(), None, None, instance_manager, Some(hooks))
|
url.as_str(),
|
||||||
.await
|
None,
|
||||||
.with_context(|| "Failed to initialize web client")
|
None,
|
||||||
.map_err(|e| format!("{:#}", e))?;
|
false,
|
||||||
|
instance_manager,
|
||||||
|
Some(hooks),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.with_context(|| "Failed to initialize web client")
|
||||||
|
.map_err(|e| format!("{:#}", e))?;
|
||||||
*web_client_guard = Some(web_client);
|
*web_client_guard = Some(web_client);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ use easytier::{
|
|||||||
},
|
},
|
||||||
rpc_service::remote_client::{self, RemoteClientManager},
|
rpc_service::remote_client::{self, RemoteClientManager},
|
||||||
tunnel::TunnelListener,
|
tunnel::TunnelListener,
|
||||||
|
web_client::security,
|
||||||
};
|
};
|
||||||
use maxminddb::geoip2;
|
use maxminddb::geoip2;
|
||||||
use session::{Location, Session};
|
use session::{Location, Session};
|
||||||
@@ -99,12 +100,20 @@ impl ClientManager {
|
|||||||
let feature_flags = self.feature_flags.clone();
|
let feature_flags = self.feature_flags.clone();
|
||||||
self.tasks.spawn(async move {
|
self.tasks.spawn(async move {
|
||||||
while let Ok(tunnel) = listener.accept().await {
|
while let Ok(tunnel) = listener.accept().await {
|
||||||
|
let (tunnel, secure) = match security::accept_or_upgrade_server_tunnel(tunnel).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(error) => {
|
||||||
|
tracing::warn!(%error, "failed to accept secure tunnel, dropping connection");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
let info = tunnel.info().unwrap();
|
let info = tunnel.info().unwrap();
|
||||||
let client_url: url::Url = info.remote_addr.unwrap().into();
|
let client_url: url::Url = info.remote_addr.unwrap().into();
|
||||||
let location = Self::lookup_location(&client_url, geoip_db.clone());
|
let location = Self::lookup_location(&client_url, geoip_db.clone());
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"New session from {:?}, location: {:?}",
|
"New session from {:?}, secure: {}, location: {:?}",
|
||||||
client_url,
|
client_url,
|
||||||
|
secure,
|
||||||
location
|
location
|
||||||
);
|
);
|
||||||
let mut session = Session::new(
|
let mut session = Session::new(
|
||||||
@@ -326,26 +335,36 @@ mod tests {
|
|||||||
connector,
|
connector,
|
||||||
"test",
|
"test",
|
||||||
"test",
|
"test",
|
||||||
|
false,
|
||||||
Arc::new(NetworkInstanceManager::new()),
|
Arc::new(NetworkInstanceManager::new()),
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
|
||||||
wait_for_condition(
|
wait_for_condition(
|
||||||
|| async { mgr.client_sessions.len() == 1 },
|
|| async { !mgr.client_sessions.is_empty() },
|
||||||
Duration::from_secs(6),
|
Duration::from_secs(12),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut a = mgr
|
let req = tokio::time::timeout(Duration::from_secs(12), async {
|
||||||
.client_sessions
|
loop {
|
||||||
.iter()
|
let session = mgr
|
||||||
.next()
|
.client_sessions
|
||||||
.unwrap()
|
.iter()
|
||||||
.data()
|
.next()
|
||||||
.read()
|
.map(|item| item.value().clone());
|
||||||
.await
|
let Some(session) = session else {
|
||||||
.heartbeat_waiter();
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
let req = a.recv().await.unwrap();
|
continue;
|
||||||
|
};
|
||||||
|
let mut waiter = session.data().read().await.heartbeat_waiter();
|
||||||
|
if let Ok(req) = waiter.recv().await {
|
||||||
|
break req;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
println!("{:?}", req);
|
println!("{:?}", req);
|
||||||
println!("{:?}", mgr);
|
println!("{:?}", mgr);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -169,6 +169,16 @@ impl WebServerService for SessionRpcService {
|
|||||||
}
|
}
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_feature(
|
||||||
|
&self,
|
||||||
|
_: BaseController,
|
||||||
|
_: easytier::proto::web::GetFeatureRequest,
|
||||||
|
) -> rpc_types::error::Result<easytier::proto::web::GetFeatureResponse> {
|
||||||
|
Ok(easytier::proto::web::GetFeatureResponse {
|
||||||
|
support_encryption: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
|
|||||||
@@ -1281,6 +1281,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
|
|||||||
config_server_url_s,
|
config_server_url_s,
|
||||||
cli.machine_id.clone(),
|
cli.machine_id.clone(),
|
||||||
cli.network_options.hostname.clone(),
|
cli.network_options.hostname.clone(),
|
||||||
|
cli.network_options.secure_mode.unwrap_or(false),
|
||||||
manager.clone(),
|
manager.clone(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -18,6 +18,13 @@ message HeartbeatRequest {
|
|||||||
|
|
||||||
message HeartbeatResponse {}
|
message HeartbeatResponse {}
|
||||||
|
|
||||||
|
message GetFeatureRequest {}
|
||||||
|
|
||||||
|
message GetFeatureResponse {
|
||||||
|
bool support_encryption = 1;
|
||||||
|
}
|
||||||
|
|
||||||
service WebServerService {
|
service WebServerService {
|
||||||
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
|
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
|
||||||
|
rpc GetFeature(GetFeatureRequest) returns (GetFeatureResponse);
|
||||||
}
|
}
|
||||||
@@ -36,6 +36,7 @@ pub struct DefaultHooks;
|
|||||||
impl WebClientHooks for DefaultHooks {}
|
impl WebClientHooks for DefaultHooks {}
|
||||||
|
|
||||||
pub mod controller;
|
pub mod controller;
|
||||||
|
pub mod security;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
@@ -52,6 +53,7 @@ impl WebClient {
|
|||||||
connector: T,
|
connector: T,
|
||||||
token: S,
|
token: S,
|
||||||
hostname: H,
|
hostname: H,
|
||||||
|
secure_mode: bool,
|
||||||
manager: Arc<NetworkInstanceManager>,
|
manager: Arc<NetworkInstanceManager>,
|
||||||
hooks: Option<Arc<dyn WebClientHooks>>,
|
hooks: Option<Arc<dyn WebClientHooks>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@@ -68,7 +70,13 @@ impl WebClient {
|
|||||||
let controller_clone = controller.clone();
|
let controller_clone = controller.clone();
|
||||||
let connected_clone = connected.clone();
|
let connected_clone = connected.clone();
|
||||||
let tasks = ScopedTask::from(tokio::spawn(async move {
|
let tasks = ScopedTask::from(tokio::spawn(async move {
|
||||||
Self::routine(controller_clone, connected_clone, Box::new(connector)).await;
|
Self::routine(
|
||||||
|
controller_clone,
|
||||||
|
connected_clone,
|
||||||
|
secure_mode,
|
||||||
|
Box::new(connector),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
WebClient {
|
WebClient {
|
||||||
@@ -82,6 +90,7 @@ impl WebClient {
|
|||||||
async fn routine(
|
async fn routine(
|
||||||
controller: Arc<controller::Controller>,
|
controller: Arc<controller::Controller>,
|
||||||
connected: Arc<AtomicBool>,
|
connected: Arc<AtomicBool>,
|
||||||
|
secure_mode: bool,
|
||||||
mut connector: Box<dyn TunnelConnector>,
|
mut connector: Box<dyn TunnelConnector>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
@@ -99,6 +108,65 @@ impl WebClient {
|
|||||||
log::info!("Successfully connected to {:?}", conn.info());
|
log::info!("Successfully connected to {:?}", conn.info());
|
||||||
|
|
||||||
let mut session = session::Session::new(conn, controller.clone());
|
let mut session = session::Session::new(conn, controller.clone());
|
||||||
|
let support_encryption = match tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(3),
|
||||||
|
session.get_feature(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(feature)) => feature.support_encryption,
|
||||||
|
Ok(Err(error)) => {
|
||||||
|
log::warn!(%error, "GetFeature rpc failed, fallback to legacy tunnel");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
log::warn!("GetFeature rpc timeout, fallback to legacy tunnel");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if support_encryption {
|
||||||
|
log::info!("Server supports encryption, reconnecting with secure tunnel");
|
||||||
|
drop(session);
|
||||||
|
|
||||||
|
let conn = match connector.connect().await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(error) => {
|
||||||
|
connected.store(false, Ordering::Release);
|
||||||
|
let wait = 1;
|
||||||
|
log::warn!(%error, "Failed to reconnect secure tunnel, retrying in {} seconds...", wait);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = match security::upgrade_client_tunnel(conn).await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(error) => {
|
||||||
|
connected.store(false, Ordering::Release);
|
||||||
|
let wait = 1;
|
||||||
|
log::warn!(%error, "Noise handshake failed, retrying in {} seconds...", wait);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut session = session::Session::new(conn, controller.clone());
|
||||||
|
session.start_heartbeat().await;
|
||||||
|
session.wait().await;
|
||||||
|
connected.store(false, Ordering::Release);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if secure_mode {
|
||||||
|
connected.store(false, Ordering::Release);
|
||||||
|
let wait = 1;
|
||||||
|
log::warn!("secure-mode enabled but server does not support encryption, retrying in {} seconds...", wait);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
session.start_heartbeat().await;
|
||||||
session.wait().await;
|
session.wait().await;
|
||||||
connected.store(false, Ordering::Release);
|
connected.store(false, Ordering::Release);
|
||||||
}
|
}
|
||||||
@@ -113,6 +181,7 @@ pub async fn run_web_client(
|
|||||||
config_server_url_s: &str,
|
config_server_url_s: &str,
|
||||||
machine_id: Option<String>,
|
machine_id: Option<String>,
|
||||||
hostname: Option<String>,
|
hostname: Option<String>,
|
||||||
|
secure_mode: bool,
|
||||||
manager: Arc<NetworkInstanceManager>,
|
manager: Arc<NetworkInstanceManager>,
|
||||||
hooks: Option<Arc<dyn WebClientHooks>>,
|
hooks: Option<Arc<dyn WebClientHooks>>,
|
||||||
) -> Result<WebClient> {
|
) -> Result<WebClient> {
|
||||||
@@ -160,6 +229,7 @@ pub async fn run_web_client(
|
|||||||
create_connector_by_url(c_url.as_str(), &global_ctx, IpVersion::Both).await?,
|
create_connector_by_url(c_url.as_str(), &global_ctx, IpVersion::Both).await?,
|
||||||
token.to_string(),
|
token.to_string(),
|
||||||
hostname,
|
hostname,
|
||||||
|
secure_mode,
|
||||||
manager.clone(),
|
manager.clone(),
|
||||||
hooks,
|
hooks,
|
||||||
))
|
))
|
||||||
@@ -178,6 +248,7 @@ mod tests {
|
|||||||
format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(),
|
format!("ring://{}/test", uuid::Uuid::new_v4()).as_str(),
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
|
false,
|
||||||
manager.clone(),
|
manager.clone(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -0,0 +1,229 @@
|
|||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use snow::{params::NoiseParams, Builder, TransportState};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
proto::common::TunnelInfo,
|
||||||
|
tunnel::{
|
||||||
|
filter::{TunnelFilter, TunnelWithFilter},
|
||||||
|
packet_def::{PacketType, ZCPacket, ZCPacketType},
|
||||||
|
SplitTunnel, StreamItem, Tunnel, TunnelError, ZCPacketSink, ZCPacketStream,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const NOISE_MAGIC: &[u8] = b"ET_WEB_NOISE_V1:";
|
||||||
|
const NOISE_PROLOGUE: &[u8] = b"easytier-webclient-noise-v1";
|
||||||
|
const NOISE_PATTERN: &str = "Noise_NN_25519_ChaChaPoly_SHA256";
|
||||||
|
|
||||||
|
struct RawSplitTunnel {
|
||||||
|
info: Option<TunnelInfo>,
|
||||||
|
split: Mutex<Option<SplitTunnel>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawSplitTunnel {
|
||||||
|
fn new(
|
||||||
|
info: Option<TunnelInfo>,
|
||||||
|
stream: std::pin::Pin<Box<dyn ZCPacketStream>>,
|
||||||
|
sink: std::pin::Pin<Box<dyn ZCPacketSink>>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
info,
|
||||||
|
split: Mutex::new(Some((stream, sink))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tunnel for RawSplitTunnel {
|
||||||
|
fn split(&self) -> SplitTunnel {
|
||||||
|
self.split
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.take()
|
||||||
|
.expect("split can only be called once")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn info(&self) -> Option<TunnelInfo> {
|
||||||
|
self.info.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NoiseTunnelFilter {
|
||||||
|
transport: Arc<Mutex<TransportState>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TunnelFilter for NoiseTunnelFilter {
|
||||||
|
type FilterOutput = ();
|
||||||
|
|
||||||
|
fn before_send(&self, data: ZCPacket) -> Option<ZCPacket> {
|
||||||
|
let plain = data.tunnel_payload();
|
||||||
|
let mut encrypted = vec![0u8; plain.len() + 64];
|
||||||
|
let len = self
|
||||||
|
.transport
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.write_message(plain, &mut encrypted)
|
||||||
|
.ok()?;
|
||||||
|
let mut packet = ZCPacket::new_with_payload(&encrypted[..len]);
|
||||||
|
packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
|
||||||
|
Some(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn after_received(&self, data: StreamItem) -> Option<StreamItem> {
|
||||||
|
let packet = match data {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => return Some(Err(e)),
|
||||||
|
};
|
||||||
|
let cipher = packet.payload();
|
||||||
|
let mut plain = vec![0u8; cipher.len() + 64];
|
||||||
|
let len = match self
|
||||||
|
.transport
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.read_message(cipher, &mut plain)
|
||||||
|
{
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
return Some(Err(TunnelError::InvalidPacket(format!(
|
||||||
|
"noise decrypt failed: {e}"
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Some(Ok(ZCPacket::new_from_buf(
|
||||||
|
BytesMut::from(&plain[..len]),
|
||||||
|
ZCPacketType::DummyTunnel,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter_output(&self) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pack_control_packet(payload: &[u8]) -> ZCPacket {
|
||||||
|
let mut packet = ZCPacket::new_with_payload(payload);
|
||||||
|
packet.fill_peer_manager_hdr(0, 0, PacketType::Data as u8);
|
||||||
|
packet
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_noise_payload(buf: &[u8]) -> Vec<u8> {
|
||||||
|
let mut payload = Vec::with_capacity(NOISE_MAGIC.len() + buf.len());
|
||||||
|
payload.extend_from_slice(NOISE_MAGIC);
|
||||||
|
payload.extend_from_slice(buf);
|
||||||
|
payload
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_noise_payload(payload: &[u8]) -> Option<&[u8]> {
|
||||||
|
payload.strip_prefix(NOISE_MAGIC)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wrap_secure_tunnel(
|
||||||
|
info: Option<TunnelInfo>,
|
||||||
|
stream: std::pin::Pin<Box<dyn ZCPacketStream>>,
|
||||||
|
sink: std::pin::Pin<Box<dyn ZCPacketSink>>,
|
||||||
|
transport: TransportState,
|
||||||
|
) -> Box<dyn Tunnel> {
|
||||||
|
let raw = RawSplitTunnel::new(info, stream, sink);
|
||||||
|
Box::new(TunnelWithFilter::new(
|
||||||
|
raw,
|
||||||
|
NoiseTunnelFilter {
|
||||||
|
transport: Arc::new(Mutex::new(transport)),
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn upgrade_client_tunnel(
|
||||||
|
tunnel: Box<dyn Tunnel>,
|
||||||
|
) -> Result<Box<dyn Tunnel>, TunnelError> {
|
||||||
|
let info = tunnel.info();
|
||||||
|
let (mut stream, mut sink) = tunnel.split();
|
||||||
|
|
||||||
|
let params: NoiseParams = NOISE_PATTERN
|
||||||
|
.parse()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("parse noise params failed: {e}")))?;
|
||||||
|
let mut state = Builder::new(params)
|
||||||
|
.prologue(NOISE_PROLOGUE)
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("set prologue failed: {e}")))?
|
||||||
|
.build_initiator()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("build initiator failed: {e}")))?;
|
||||||
|
|
||||||
|
let mut msg1 = vec![0u8; 1024];
|
||||||
|
let msg1_len = state
|
||||||
|
.write_message(&[], &mut msg1)
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("write noise msg1 failed: {e}")))?;
|
||||||
|
sink.send(pack_control_packet(&encode_noise_payload(
|
||||||
|
&msg1[..msg1_len],
|
||||||
|
)))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let msg2_packet = stream.next().await.ok_or(TunnelError::Shutdown)??;
|
||||||
|
let msg2_cipher = decode_noise_payload(msg2_packet.payload())
|
||||||
|
.ok_or_else(|| TunnelError::InvalidPacket("invalid noise msg2 magic".to_string()))?;
|
||||||
|
let mut msg2 = vec![0u8; 1024];
|
||||||
|
state
|
||||||
|
.read_message(msg2_cipher, &mut msg2)
|
||||||
|
.map_err(|e| TunnelError::InvalidPacket(format!("read noise msg2 failed: {e}")))?;
|
||||||
|
|
||||||
|
let transport = state
|
||||||
|
.into_transport_mode()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("switch transport mode failed: {e}")))?;
|
||||||
|
|
||||||
|
Ok(wrap_secure_tunnel(info, stream, sink, transport))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn accept_or_upgrade_server_tunnel(
|
||||||
|
tunnel: Box<dyn Tunnel>,
|
||||||
|
) -> Result<(Box<dyn Tunnel>, bool), TunnelError> {
|
||||||
|
let info = tunnel.info();
|
||||||
|
let (stream, sink) = tunnel.split();
|
||||||
|
let mut stream = stream;
|
||||||
|
let mut sink = sink;
|
||||||
|
|
||||||
|
let first_packet = match tokio::time::timeout(Duration::from_secs(1), stream.next()).await {
|
||||||
|
Ok(Some(Ok(packet))) => packet,
|
||||||
|
Ok(Some(Err(error))) => return Err(error),
|
||||||
|
Ok(None) => return Err(TunnelError::Shutdown),
|
||||||
|
Err(_) => {
|
||||||
|
return Ok((
|
||||||
|
Box::new(RawSplitTunnel::new(info, stream, sink)) as Box<dyn Tunnel>,
|
||||||
|
false,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let Some(msg1_cipher) = decode_noise_payload(first_packet.payload()) else {
|
||||||
|
let stream = Box::pin(futures::stream::once(async move { Ok(first_packet) }).chain(stream));
|
||||||
|
return Ok((
|
||||||
|
Box::new(RawSplitTunnel::new(info, stream, sink)) as Box<dyn Tunnel>,
|
||||||
|
false,
|
||||||
|
));
|
||||||
|
};
|
||||||
|
|
||||||
|
let params: NoiseParams = NOISE_PATTERN
|
||||||
|
.parse()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("parse noise params failed: {e}")))?;
|
||||||
|
let mut state = Builder::new(params)
|
||||||
|
.prologue(NOISE_PROLOGUE)
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("set prologue failed: {e}")))?
|
||||||
|
.build_responder()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("build responder failed: {e}")))?;
|
||||||
|
|
||||||
|
let mut msg1 = vec![0u8; 1024];
|
||||||
|
state
|
||||||
|
.read_message(msg1_cipher, &mut msg1)
|
||||||
|
.map_err(|e| TunnelError::InvalidPacket(format!("read noise msg1 failed: {e}")))?;
|
||||||
|
|
||||||
|
let mut msg2 = vec![0u8; 1024];
|
||||||
|
let msg2_len = state
|
||||||
|
.write_message(&[], &mut msg2)
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("write noise msg2 failed: {e}")))?;
|
||||||
|
sink.send(pack_control_packet(&encode_noise_payload(
|
||||||
|
&msg2[..msg2_len],
|
||||||
|
)))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let transport = state
|
||||||
|
.into_transport_mode()
|
||||||
|
.map_err(|e| TunnelError::InternalError(format!("switch transport mode failed: {e}")))?;
|
||||||
|
|
||||||
|
Ok((wrap_secure_tunnel(info, stream, sink, transport), true))
|
||||||
|
}
|
||||||
@@ -12,7 +12,10 @@ use crate::{
|
|||||||
api::manage::WebClientServiceServer,
|
api::manage::WebClientServiceServer,
|
||||||
rpc_impl::bidirect::BidirectRpcManager,
|
rpc_impl::bidirect::BidirectRpcManager,
|
||||||
rpc_types::controller::BaseController,
|
rpc_types::controller::BaseController,
|
||||||
web::{HeartbeatRequest, HeartbeatResponse, WebServerServiceClientFactory},
|
web::{
|
||||||
|
GetFeatureRequest, GetFeatureResponse, HeartbeatRequest, HeartbeatResponse,
|
||||||
|
WebServerServiceClientFactory,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
tunnel::Tunnel,
|
tunnel::Tunnel,
|
||||||
};
|
};
|
||||||
@@ -30,6 +33,7 @@ pub struct Session {
|
|||||||
controller: Arc<Controller>,
|
controller: Arc<Controller>,
|
||||||
|
|
||||||
heartbeat_ctx: HeartbeatCtx,
|
heartbeat_ctx: HeartbeatCtx,
|
||||||
|
heartbeat_started: std::sync::atomic::AtomicBool,
|
||||||
|
|
||||||
tasks: Mutex<JoinSet<()>>,
|
tasks: Mutex<JoinSet<()>>,
|
||||||
}
|
}
|
||||||
@@ -44,15 +48,18 @@ impl Session {
|
|||||||
"",
|
"",
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut tasks: JoinSet<()> = JoinSet::new();
|
let (tx, _rx1) = broadcast::channel(2);
|
||||||
let heartbeat_ctx =
|
let heartbeat_ctx = HeartbeatCtx {
|
||||||
Self::heartbeat_routine(&rpc_mgr, Arc::downgrade(&controller), &mut tasks);
|
notifier: Arc::new(tx),
|
||||||
|
resp: Arc::new(Mutex::new(None)),
|
||||||
|
};
|
||||||
|
|
||||||
Session {
|
Session {
|
||||||
rpc_mgr,
|
rpc_mgr,
|
||||||
controller,
|
controller,
|
||||||
heartbeat_ctx,
|
heartbeat_ctx,
|
||||||
tasks: Mutex::new(tasks),
|
heartbeat_started: std::sync::atomic::AtomicBool::new(false),
|
||||||
|
tasks: Mutex::new(JoinSet::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,14 +67,8 @@ impl Session {
|
|||||||
rpc_mgr: &BidirectRpcManager,
|
rpc_mgr: &BidirectRpcManager,
|
||||||
controller: Weak<Controller>,
|
controller: Weak<Controller>,
|
||||||
tasks: &mut JoinSet<()>,
|
tasks: &mut JoinSet<()>,
|
||||||
) -> HeartbeatCtx {
|
ctx: HeartbeatCtx,
|
||||||
let (tx, _rx1) = broadcast::channel(2);
|
) {
|
||||||
|
|
||||||
let ctx = HeartbeatCtx {
|
|
||||||
notifier: Arc::new(tx),
|
|
||||||
resp: Arc::new(Mutex::new(None)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mid = get_machine_id();
|
let mid = get_machine_id();
|
||||||
let inst_id = uuid::Uuid::new_v4();
|
let inst_id = uuid::Uuid::new_v4();
|
||||||
let token = controller.upgrade().unwrap().token();
|
let token = controller.upgrade().unwrap().token();
|
||||||
@@ -118,8 +119,22 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
ctx
|
pub async fn start_heartbeat(&self) {
|
||||||
|
if self
|
||||||
|
.heartbeat_started
|
||||||
|
.swap(true, std::sync::atomic::Ordering::AcqRel)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut tasks = self.tasks.lock().await;
|
||||||
|
Self::heartbeat_routine(
|
||||||
|
&self.rpc_mgr,
|
||||||
|
Arc::downgrade(&self.controller),
|
||||||
|
&mut tasks,
|
||||||
|
self.heartbeat_ctx.clone(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_routines(&self) {
|
async fn wait_routines(&self) {
|
||||||
@@ -135,6 +150,18 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_feature(
|
||||||
|
&self,
|
||||||
|
) -> Result<GetFeatureResponse, crate::proto::rpc_types::error::Error> {
|
||||||
|
let client = self
|
||||||
|
.rpc_mgr
|
||||||
|
.rpc_client()
|
||||||
|
.scoped_client::<WebServerServiceClientFactory<BaseController>>(1, 1, "".to_string());
|
||||||
|
client
|
||||||
|
.get_feature(BaseController::default(), GetFeatureRequest {})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn wait_next_heartbeat(&self) -> Option<HeartbeatResponse> {
|
pub async fn wait_next_heartbeat(&self) -> Option<HeartbeatResponse> {
|
||||||
let mut rx = self.heartbeat_ctx.notifier.subscribe();
|
let mut rx = self.heartbeat_ctx.notifier.subscribe();
|
||||||
rx.recv().await.ok()
|
rx.recv().await.ok()
|
||||||
|
|||||||
Reference in New Issue
Block a user