fix high cpu usage when client proto mismatch (#481)

before this patch, invalid packat received by tunnel reader may cause a dead loop in handshake.
This commit is contained in:
Sijie.Sun
2024-11-19 21:36:09 +08:00
committed by GitHub
parent 1324e6163e
commit 3f47f37470
2 changed files with 28 additions and 13 deletions
+16 -5
View File
@@ -133,15 +133,23 @@ impl PeerConn {
let mut locked = self.recv.lock().await; let mut locked = self.recv.lock().await;
let recv = locked.as_mut().unwrap(); let recv = locked.as_mut().unwrap();
let Some(rsp) = recv.next().await else { let rsp = match recv.next().await {
return Err(Error::WaitRespError( Some(Ok(rsp)) => rsp,
"conn closed during wait handshake response".to_owned(), Some(Err(e)) => {
)); return Err(Error::WaitRespError(format!(
"conn recv error during wait handshake response, err: {:?}",
e
)))
}
None => {
return Err(Error::WaitRespError(
"conn closed during wait handshake response".to_owned(),
))
}
}; };
*need_retry = true; *need_retry = true;
let rsp = rsp?;
let Some(peer_mgr_hdr) = rsp.peer_manager_header() else { let Some(peer_mgr_hdr) = rsp.peer_manager_header() else {
return Err(Error::WaitRespError(format!( return Err(Error::WaitRespError(format!(
"unexpected packet: {:?}, cannot decode peer manager hdr", "unexpected packet: {:?}, cannot decode peer manager hdr",
@@ -214,6 +222,9 @@ impl PeerConn {
Error::WaitRespError("send handshake request error".to_owned()) Error::WaitRespError("send handshake request error".to_owned())
})?; })?;
// yield to send the response packet
tokio::task::yield_now().await;
Ok(()) Ok(())
} }
+12 -8
View File
@@ -75,18 +75,12 @@ pin_project! {
#[pin] #[pin]
reader: R, reader: R,
buf: BytesMut, buf: BytesMut,
state: FrameReaderState,
max_packet_size: usize, max_packet_size: usize,
associate_data: Option<Box<dyn Any + Send + 'static>>, associate_data: Option<Box<dyn Any + Send + 'static>>,
error: Option<TunnelError>,
} }
} }
// usize means the size remaining to read
enum FrameReaderState {
ReadingHeader(usize),
ReadingBody(usize),
}
impl<R> FramedReader<R> { impl<R> FramedReader<R> {
pub fn new(reader: R, max_packet_size: usize) -> Self { pub fn new(reader: R, max_packet_size: usize) -> Self {
Self::new_with_associate_data(reader, max_packet_size, None) Self::new_with_associate_data(reader, max_packet_size, None)
@@ -100,9 +94,9 @@ impl<R> FramedReader<R> {
FramedReader { FramedReader {
reader, reader,
buf: BytesMut::with_capacity(max_packet_size), buf: BytesMut::with_capacity(max_packet_size),
state: FrameReaderState::ReadingHeader(4),
max_packet_size, max_packet_size,
associate_data, associate_data,
error: None,
} }
} }
@@ -146,9 +140,19 @@ where
let mut self_mut = self.project(); let mut self_mut = self.project();
loop { loop {
if let Some(e) = self_mut.error.as_ref() {
tracing::warn!("poll_next on a failed FramedReader, {:?}", e);
return Poll::Ready(None);
}
while let Some(packet) = while let Some(packet) =
Self::extract_one_packet(self_mut.buf, *self_mut.max_packet_size) Self::extract_one_packet(self_mut.buf, *self_mut.max_packet_size)
{ {
if let Err(TunnelError::InvalidPacket(msg)) = packet.as_ref() {
self_mut
.error
.replace(TunnelError::InvalidPacket(msg.clone()));
}
return Poll::Ready(Some(packet)); return Poll::Ready(Some(packet));
} }