feat(web): full-power RPC access + typed JSON proxy endpoint (#1983)

- extend web controller bindings to cover full RPC service set
- update rpc_service API wiring and session/controller integration
- generate trait-level json_call_method in rpc codegen
- route restful proxy-rpc requests via scoped typed clients
- add json-call regression tests and required Sync bound fixes~
This commit is contained in:
KKRainbow
2026-03-11 20:32:37 +08:00
committed by GitHub
parent 80043df292
commit 330659e449
14 changed files with 546 additions and 61 deletions
@@ -342,7 +342,8 @@ impl PunchSymToConeHoleClient {
async fn get_rpc_stub(
&self,
dst_peer_id: PeerId,
) -> Box<dyn UdpHolePunchRpc<Controller = BaseController> + std::marker::Send + 'static> {
) -> Box<dyn UdpHolePunchRpc<Controller = BaseController> + std::marker::Send + Sync + 'static>
{
self.peer_mgr
.get_peer_rpc_mgr()
.rpc_client()
+96
View File
@@ -260,3 +260,99 @@ pub mod logger {
pub mod manage {
include!(concat!(env!("OUT_DIR"), "/api.manage.rs"));
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use prost::Message;
use super::manage::{
ListNetworkInstanceRequest, ListNetworkInstanceResponse, WebClientService,
WebClientServiceClient, WebClientServiceDescriptor, WebClientServiceMethodDescriptor,
};
use crate::proto::common::Uuid;
use crate::proto::rpc_types::controller::BaseController;
use crate::proto::rpc_types::descriptor::ServiceDescriptor;
use crate::proto::rpc_types::error::Error;
use crate::proto::rpc_types::handler::Handler;
#[derive(Clone, Default)]
struct WebClientServiceJsonCallHandler;
#[async_trait::async_trait]
impl Handler for WebClientServiceJsonCallHandler {
type Descriptor = WebClientServiceDescriptor;
type Controller = BaseController;
async fn call(
&self,
_ctrl: Self::Controller,
method: <Self::Descriptor as ServiceDescriptor>::Method,
input: Bytes,
) -> crate::proto::rpc_types::error::Result<Bytes> {
match method {
WebClientServiceMethodDescriptor::ListNetworkInstance => {
let _req = ListNetworkInstanceRequest::decode(input.as_ref()).unwrap();
let resp = ListNetworkInstanceResponse {
inst_ids: vec![Uuid {
part1: 1,
part2: 2,
part3: 3,
part4: 4,
}],
};
Ok(Bytes::from(resp.encode_to_vec()))
}
_ => Err(Error::ExecutionError(anyhow::anyhow!(
"unsupported method in test handler"
))),
}
}
}
#[tokio::test]
async fn web_client_service_call_json_method_supports_snake_and_proto_method_name() {
let client = WebClientServiceClient::new(WebClientServiceJsonCallHandler);
let snake_result = client
.json_call_method(
BaseController::default(),
"list_network_instance",
serde_json::json!({}),
)
.await
.unwrap();
assert_eq!(
snake_result["inst_ids"][0],
serde_json::json!({
"part1": 1,
"part2": 2,
"part3": 3,
"part4": 4
})
);
let proto_result = client
.json_call_method(
BaseController::default(),
"ListNetworkInstance",
serde_json::json!({}),
)
.await
.unwrap();
assert_eq!(proto_result["inst_ids"].as_array().unwrap().len(), 1);
}
#[tokio::test]
async fn web_client_service_call_json_method_rejects_unknown_method() {
let client = WebClientServiceClient::new(WebClientServiceJsonCallHandler);
let ret = client
.json_call_method(
BaseController::default(),
"not_exist_method",
serde_json::json!({}),
)
.await;
assert!(ret.is_err());
}
}
+95
View File
@@ -7,6 +7,101 @@ use tokio::task::JoinSet;
use super::rpc_impl::RpcController;
#[derive(Clone, Default)]
struct GreetingJsonCallHandler;
#[async_trait::async_trait]
impl crate::proto::rpc_types::handler::Handler for GreetingJsonCallHandler {
type Descriptor = GreetingDescriptor;
type Controller = crate::proto::rpc_types::controller::BaseController;
async fn call(
&self,
_ctrl: Self::Controller,
method: <Self::Descriptor as crate::proto::rpc_types::descriptor::ServiceDescriptor>::Method,
input: bytes::Bytes,
) -> crate::proto::rpc_types::error::Result<bytes::Bytes> {
use prost::Message;
match method {
GreetingMethodDescriptor::SayHello => {
let req = SayHelloRequest::decode(input)?;
let resp = SayHelloResponse {
greeting: format!("Hello {}!", req.name),
};
Ok(bytes::Bytes::from(resp.encode_to_vec()))
}
GreetingMethodDescriptor::SayGoodbye => {
let req = SayGoodbyeRequest::decode(input)?;
let resp = SayGoodbyeResponse {
greeting: format!("Goodbye, {}!", req.name),
};
Ok(bytes::Bytes::from(resp.encode_to_vec()))
}
}
}
}
#[tokio::test]
async fn greeting_client_json_call_method_supports_snake_and_proto_method_name() {
let client = GreetingClient::new(GreetingJsonCallHandler);
let snake = client
.json_call_method(
crate::proto::rpc_types::controller::BaseController::default(),
"say_hello",
serde_json::json!({"name": "world"}),
)
.await
.unwrap();
assert_eq!(snake["greeting"], serde_json::json!("Hello world!"));
let proto = client
.json_call_method(
crate::proto::rpc_types::controller::BaseController::default(),
"SayHello",
serde_json::json!({"name": "world"}),
)
.await
.unwrap();
assert_eq!(proto["greeting"], serde_json::json!("Hello world!"));
}
#[tokio::test]
async fn greeting_client_json_call_method_rejects_invalid_json() {
let client = GreetingClient::new(GreetingJsonCallHandler);
let err = client
.json_call_method(
crate::proto::rpc_types::controller::BaseController::default(),
"say_hello",
serde_json::json!({"name": 123}),
)
.await
.unwrap_err();
assert!(matches!(
err,
crate::proto::rpc_types::error::Error::MalformatRpcPacket(_)
));
}
#[tokio::test]
async fn greeting_client_json_call_method_rejects_unknown_method() {
let client = GreetingClient::new(GreetingJsonCallHandler);
let err = client
.json_call_method(
crate::proto::rpc_types::controller::BaseController::default(),
"not_exist_method",
serde_json::json!({"name": "world"}),
)
.await
.unwrap_err();
assert!(matches!(
err,
crate::proto::rpc_types::error::Error::InvalidMethodIndex(0, _)
));
}
#[derive(Clone)]
pub struct GreetingService {
pub delay_ms: u64,
+5 -4
View File
@@ -31,7 +31,7 @@ use crate::{
stats::StatsRpcService, vpn_portal::VpnPortalRpcService,
},
tunnel::{tcp::TcpTunnelListener, TunnelListener},
web_client::DefaultHooks,
web_client::{DefaultHooks, WebClientHooks},
};
pub struct ApiRpcServer<T: TunnelListener + 'static> {
@@ -64,7 +64,7 @@ impl ApiRpcServer<TcpTunnelListener> {
impl<T: TunnelListener + 'static> ApiRpcServer<T> {
pub fn from_tunnel(tunnel: T, instance_manager: Arc<NetworkInstanceManager>) -> Self {
let rpc_server = StandAloneServer::new(tunnel);
register_api_rpc_service(&instance_manager, rpc_server.registry());
register_api_rpc_service(&instance_manager, rpc_server.registry(), None);
Self { rpc_server }
}
}
@@ -87,9 +87,10 @@ impl<T: TunnelListener + 'static> Drop for ApiRpcServer<T> {
}
}
fn register_api_rpc_service(
pub fn register_api_rpc_service(
instance_manager: &Arc<NetworkInstanceManager>,
registry: &ServiceRegistry,
hooks: Option<Arc<dyn WebClientHooks>>,
) {
registry.register(
PeerManageRpcServer::new(PeerManageRpcService::new(instance_manager.clone())),
@@ -148,7 +149,7 @@ fn register_api_rpc_service(
registry.register(
WebClientServiceServer::new(InstanceManageRpcService::new(
instance_manager.clone(),
Arc::new(DefaultHooks),
hooks.unwrap_or(Arc::new(DefaultHooks)),
)),
"",
);
+1 -1
View File
@@ -1,5 +1,4 @@
mod acl_manage;
mod api;
mod config;
mod connector_manage;
mod credential_manage;
@@ -11,6 +10,7 @@ mod proxy;
mod stats;
mod vpn_portal;
pub mod api;
pub mod instance_manage;
pub mod logger;
pub mod remote_client;
+4 -4
View File
@@ -1,8 +1,8 @@
use std::sync::Arc;
use crate::{
instance_manager::NetworkInstanceManager,
rpc_service::instance_manage::InstanceManageRpcService, web_client::WebClientHooks,
instance_manager::NetworkInstanceManager, proto::rpc_impl::service_registry::ServiceRegistry,
rpc_service::api::register_api_rpc_service, web_client::WebClientHooks,
};
pub struct Controller {
@@ -39,8 +39,8 @@ impl Controller {
self.hostname.clone()
}
pub fn get_rpc_service(&self) -> InstanceManageRpcService {
InstanceManageRpcService::new(self.manager.clone(), self.hooks.clone())
pub fn register_api_rpc_service(&self, registry: &ServiceRegistry) {
register_api_rpc_service(&self.manager, registry, Some(self.hooks.clone()));
}
pub(super) fn notify_manager_stopping(&self) {
+1 -5
View File
@@ -9,7 +9,6 @@ use tokio::{
use crate::{
common::{constants::EASYTIER_VERSION, get_machine_id},
proto::{
api::manage::WebClientServiceServer,
rpc_impl::bidirect::BidirectRpcManager,
rpc_types::controller::BaseController,
web::{
@@ -43,10 +42,7 @@ impl Session {
let rpc_mgr = BidirectRpcManager::new();
rpc_mgr.run_with_tunnel(tunnel);
rpc_mgr.rpc_server().registry().register(
WebClientServiceServer::new(controller.get_rpc_service()),
"",
);
controller.register_api_rpc_service(rpc_mgr.rpc_server().registry());
let (tx, _rx1) = broadcast::channel(2);
let heartbeat_ctx = HeartbeatCtx {