mirror of
https://github.com/EasyTier/EasyTier.git
synced 2026-05-07 10:14:35 +00:00
fix(ospf): mitigate route sync storm under connection flapping (#2063)
Addresses issue #2016 where nodes behind unstable networks (e.g. campus firewalls) cause excessive traffic that can freeze the remote node. Two changes in peer_ospf_route.rs: - Make do_sync_route_info only trigger reverse sync_now when incoming data actually changed the route table or foreign network state. The previous unconditional sync_now created an A->B->A->B ping-pong cycle on every RPC exchange. - Add exponential backoff (50ms..5s) to session_task retry loop. The previous fixed 50ms retry produced ~20 RPCs/s during sustained network instability.
This commit is contained in:
@@ -659,7 +659,8 @@ impl SyncedRouteInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) {
|
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) -> bool {
|
||||||
|
let mut changed = false;
|
||||||
for item in foreign_network.infos.iter().map(Clone::clone) {
|
for item in foreign_network.infos.iter().map(Clone::clone) {
|
||||||
let Some(key) = item.key else {
|
let Some(key) = item.key else {
|
||||||
continue;
|
continue;
|
||||||
@@ -675,10 +676,15 @@ impl SyncedRouteInfo {
|
|||||||
.and_modify(|old_entry| {
|
.and_modify(|old_entry| {
|
||||||
if entry.version > old_entry.version {
|
if entry.version > old_entry.version {
|
||||||
*old_entry = entry.clone();
|
*old_entry = entry.clone();
|
||||||
|
changed = true;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.or_insert_with(|| entry.clone());
|
.or_insert_with(|| {
|
||||||
|
changed = true;
|
||||||
|
entry.clone()
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
changed
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_my_peer_info(
|
fn update_my_peer_info(
|
||||||
@@ -2847,8 +2853,14 @@ impl RouteSessionManager {
|
|||||||
dst_peer_id: PeerId,
|
dst_peer_id: PeerId,
|
||||||
mut sync_now: tokio::sync::broadcast::Receiver<()>,
|
mut sync_now: tokio::sync::broadcast::Receiver<()>,
|
||||||
) {
|
) {
|
||||||
|
const RETRY_BASE_MS: u64 = 50;
|
||||||
|
const RETRY_MAX_MS: u64 = 5000;
|
||||||
|
|
||||||
let mut last_sync = Instant::now();
|
let mut last_sync = Instant::now();
|
||||||
let mut last_clean_dst_saved_map = Instant::now();
|
let mut last_clean_dst_saved_map = Instant::now();
|
||||||
|
// Keep retry_delay_ms across outer iterations so that rapid
|
||||||
|
// connect/disconnect flaps don't fully reset the backoff.
|
||||||
|
let mut retry_delay_ms = RETRY_BASE_MS;
|
||||||
loop {
|
loop {
|
||||||
loop {
|
loop {
|
||||||
let Some(service_impl) = service_impl.clone().upgrade() else {
|
let Some(service_impl) = service_impl.clone().upgrade() else {
|
||||||
@@ -2875,13 +2887,18 @@ impl RouteSessionManager {
|
|||||||
last_clean_dst_saved_map = Instant::now();
|
last_clean_dst_saved_map = Instant::now();
|
||||||
service_impl.clean_dst_saved_map(dst_peer_id);
|
service_impl.clean_dst_saved_map(dst_peer_id);
|
||||||
}
|
}
|
||||||
|
// Successful sync: decay backoff towards base so the next
|
||||||
|
// real failure still starts at a reasonable level, but
|
||||||
|
// don't fully reset to avoid 50ms bursts during flapping.
|
||||||
|
retry_delay_ms = (retry_delay_ms / 2).max(RETRY_BASE_MS);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(service_impl);
|
drop(service_impl);
|
||||||
drop(peer_rpc);
|
drop(peer_rpc);
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
tokio::time::sleep(Duration::from_millis(retry_delay_ms)).await;
|
||||||
|
retry_delay_ms = (retry_delay_ms * 2).min(RETRY_MAX_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
sync_now = sync_now.resubscribe();
|
sync_now = sync_now.resubscribe();
|
||||||
@@ -3214,17 +3231,18 @@ impl RouteSessionManager {
|
|||||||
service_impl.update_route_table_and_cached_local_conn_bitmap();
|
service_impl.update_route_table_and_cached_local_conn_bitmap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut foreign_network_changed = false;
|
||||||
if let Some(foreign_network) = &foreign_network {
|
if let Some(foreign_network) = &foreign_network {
|
||||||
// Step 9b: credential peers' foreign_network_infos are always ignored
|
// Step 9b: credential peers' foreign_network_infos are always ignored
|
||||||
if !from_is_credential {
|
if !from_is_credential {
|
||||||
service_impl
|
foreign_network_changed = service_impl
|
||||||
.synced_route_info
|
.synced_route_info
|
||||||
.update_foreign_network(foreign_network);
|
.update_foreign_network(foreign_network);
|
||||||
session.update_dst_saved_foreign_network_version(foreign_network, from_peer_id);
|
session.update_dst_saved_foreign_network_version(foreign_network, from_peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if need_update_route_table || foreign_network.is_some() {
|
if need_update_route_table || foreign_network_changed {
|
||||||
service_impl.update_foreign_network_owner_map();
|
service_impl.update_foreign_network_owner_map();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3243,7 +3261,13 @@ impl RouteSessionManager {
|
|||||||
.disconnect_untrusted_peers(&untrusted_peers)
|
.disconnect_untrusted_peers(&untrusted_peers)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.sync_now("sync_route_info");
|
// Only trigger reverse sync when we actually received new data that
|
||||||
|
// needs to be propagated to other peers. Previously this was
|
||||||
|
// unconditional, which created an A→B→A→B ping-pong storm even when
|
||||||
|
// there was nothing new to propagate.
|
||||||
|
if need_update_route_table || foreign_network_changed {
|
||||||
|
self.sync_now("sync_route_info");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(SyncRouteInfoResponse {
|
Ok(SyncRouteInfoResponse {
|
||||||
is_initiator,
|
is_initiator,
|
||||||
|
|||||||
Reference in New Issue
Block a user