improve performance of route generation (#914)

this may fix following problem:

1. cpu 100% when large number of nodes in network.
2. high cpu usage when large number of foreign networks.
3. packet loss when new node enters/exits.
4. old routes not cleand and show as an obloleted entry.
This commit is contained in:
Sijie.Sun
2025-06-02 20:12:27 +08:00
committed by GitHub
parent b5dfc7374c
commit 4608bca998
7 changed files with 474 additions and 219 deletions
+279 -213
View File
@@ -1,7 +1,6 @@
use std::{
collections::BTreeSet,
fmt::Debug,
hash::RandomState,
net::Ipv4Addr,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
@@ -13,9 +12,10 @@ use std::{
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;
use petgraph::{
algo::{all_simple_paths, astar, dijkstra},
graph::NodeIndex,
Directed, Graph,
algo::dijkstra,
graph::{Graph, NodeIndex},
visit::{EdgeRef, IntoNodeReferences},
Directed,
};
use prost::Message;
use prost_reflect::{DynamicMessage, ReflectMessage};
@@ -49,6 +49,7 @@ use crate::{
};
use super::{
graph_algo::dijkstra_with_first_hop,
peer_rpc::PeerRpcManager,
route_trait::{
DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator,
@@ -60,7 +61,8 @@ use super::{
static SERVICE_ID: u32 = 7;
static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600);
static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
static AVOID_RELAY_COST: i32 = i32::MAX / 512;
// the cost (latency between two peers) is i32, i32::MAX is large enough.
static AVOID_RELAY_COST: usize = i32::MAX as usize;
type Version = u32;
@@ -80,14 +82,12 @@ impl AtomicVersion {
self.0.store(version, Ordering::Relaxed);
}
fn inc(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
fn inc(&self) -> Version {
self.0.fetch_add(1, Ordering::Relaxed) + 1
}
fn set_if_larger(&self, version: Version) {
if self.get() < version {
self.set(version);
}
self.0.fetch_max(version, Ordering::Relaxed);
}
}
@@ -283,13 +283,25 @@ impl RouteConnBitmap {
type Error = SyncRouteInfoError;
// constructed with all infos synced from all peers.
#[derive(Debug)]
struct SyncedRouteInfo {
peer_infos: DashMap<PeerId, RoutePeerInfo>,
// prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers.
raw_peer_infos: DashMap<PeerId, DynamicMessage>,
conn_map: DashMap<PeerId, (BTreeSet<PeerId>, AtomicVersion)>,
foreign_network: DashMap<ForeignNetworkRouteInfoKey, ForeignNetworkRouteInfoEntry>,
version: AtomicVersion,
}
impl Debug for SyncedRouteInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncedRouteInfo")
.field("peer_infos", &self.peer_infos)
.field("conn_map", &self.conn_map)
.field("foreign_network", &self.foreign_network)
.field("version", &self.version.get())
.finish()
}
}
impl SyncedRouteInfo {
@@ -305,17 +317,24 @@ impl SyncedRouteInfo {
self.raw_peer_infos.remove(&peer_id);
self.conn_map.remove(&peer_id);
self.foreign_network.retain(|k, _| k.peer_id != peer_id);
self.version.inc();
}
fn fill_empty_peer_info(&self, peer_ids: &BTreeSet<PeerId>) {
let mut need_inc_version = false;
for peer_id in peer_ids {
self.peer_infos
.entry(*peer_id)
.or_insert_with(|| RoutePeerInfo::new());
self.peer_infos.entry(*peer_id).or_insert_with(|| {
need_inc_version = true;
RoutePeerInfo::new()
});
self.conn_map
.entry(*peer_id)
.or_insert_with(|| (BTreeSet::new(), AtomicVersion::new()));
self.conn_map.entry(*peer_id).or_insert_with(|| {
need_inc_version = true;
(BTreeSet::new(), AtomicVersion::new())
});
}
if need_inc_version {
self.version.inc();
}
}
@@ -377,6 +396,7 @@ impl SyncedRouteInfo {
peer_infos: &Vec<RoutePeerInfo>,
raw_peer_infos: &Vec<DynamicMessage>,
) -> Result<(), Error> {
let mut need_inc_version = false;
for (idx, route_info) in peer_infos.iter().enumerate() {
let mut route_info = route_info.clone();
let raw_route_info = &raw_peer_infos[idx];
@@ -410,20 +430,27 @@ impl SyncedRouteInfo {
self.raw_peer_infos
.insert(route_info.peer_id, raw_route_info.clone());
*old_entry = route_info.clone();
need_inc_version = true;
}
})
.or_insert_with(|| {
need_inc_version = true;
self.raw_peer_infos
.insert(route_info.peer_id, raw_route_info.clone());
route_info.clone()
});
}
if need_inc_version {
self.version.inc();
}
Ok(())
}
fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) {
self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.0).collect());
let mut need_inc_version = false;
for (peer_idx, (peer_id, version)) in conn_bitmap.peer_ids.iter().enumerate() {
assert!(self.peer_infos.contains_key(peer_id));
let connceted_peers = conn_bitmap.get_connected_peers(peer_idx);
@@ -434,19 +461,25 @@ impl SyncedRouteInfo {
.and_modify(|(old_conn_bitmap, old_version)| {
if *version > old_version.get() {
*old_conn_bitmap = conn_bitmap.get_connected_peers(peer_idx);
need_inc_version = true;
old_version.set(*version);
}
})
.or_insert_with(|| {
need_inc_version = true;
(
conn_bitmap.get_connected_peers(peer_idx),
version.clone().into(),
)
});
}
if need_inc_version {
self.version.inc();
}
}
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) {
let mut need_inc_version = false;
for item in foreign_network.infos.iter().map(Clone::clone) {
let Some(key) = item.key else {
continue;
@@ -461,10 +494,14 @@ impl SyncedRouteInfo {
.entry(key.clone())
.and_modify(|old_entry| {
if entry.version > old_entry.version {
need_inc_version = true;
*old_entry = entry.clone();
}
})
.or_insert_with(|| entry.clone());
.or_insert_with(|| {
need_inc_version = true;
entry.clone()
});
}
}
@@ -483,7 +520,12 @@ impl SyncedRouteInfo {
let old_version = old.version;
*old = new;
new_version != old_version
if new_version != old_version {
self.version.inc();
true
} else {
false
}
}
fn update_my_conn_info(&self, my_peer_id: PeerId, connected_peers: BTreeSet<PeerId>) -> bool {
@@ -499,6 +541,7 @@ impl SyncedRouteInfo {
} else {
let _ = std::mem::replace(&mut my_conn_info.value_mut().0, connected_peers);
my_conn_info.value().1.inc();
self.version.inc();
true
}
}
@@ -557,6 +600,10 @@ impl SyncedRouteInfo {
updated = true;
}
if updated {
self.version.inc();
}
updated
}
@@ -573,13 +620,14 @@ impl SyncedRouteInfo {
}
}
type PeerGraph = Graph<PeerId, i32, Directed>;
type PeerGraph = Graph<PeerId, usize, Directed>;
type PeerIdToNodexIdxMap = DashMap<PeerId, NodeIndex>;
#[derive(Debug, Clone, Copy)]
struct NextHopInfo {
next_hop_peer_id: PeerId,
path_latency: i32,
path_len: usize, // path includes src and dst.
version: Version,
}
// dst_peer_id -> (next_hop_peer_id, cost, path_len)
type NextHopMap = DashMap<PeerId, NextHopInfo>;
@@ -591,6 +639,7 @@ struct RouteTable {
next_hop_map: NextHopMap,
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerId>,
cidr_peer_id_map: DashMap<cidr::IpCidr, PeerId>,
next_hop_map_version: AtomicVersion,
}
impl RouteTable {
@@ -600,15 +649,23 @@ impl RouteTable {
next_hop_map: DashMap::new(),
ipv4_peer_id_map: DashMap::new(),
cidr_peer_id_map: DashMap::new(),
next_hop_map_version: AtomicVersion::new(),
}
}
fn get_next_hop(&self, dst_peer_id: PeerId) -> Option<NextHopInfo> {
self.next_hop_map.get(&dst_peer_id).map(|x| *x)
let cur_version = self.next_hop_map_version.get();
self.next_hop_map.get(&dst_peer_id).and_then(|x| {
if x.version >= cur_version {
Some(*x)
} else {
None
}
})
}
fn peer_reachable(&self, peer_id: PeerId) -> bool {
self.next_hop_map.contains_key(&peer_id)
self.get_next_hop(peer_id).is_some()
}
fn get_nat_type(&self, peer_id: PeerId) -> Option<NatType> {
@@ -617,158 +674,16 @@ impl RouteTable {
.map(|x| NatType::try_from(x.udp_stun_info as i32).unwrap_or_default())
}
// return graph and start node index (node of my peer id).
fn build_peer_graph_from_synced_info<T: RouteCostCalculatorInterface>(
peers: Vec<PeerId>,
synced_info: &SyncedRouteInfo,
cost_calc: &mut T,
) -> (PeerGraph, PeerIdToNodexIdxMap) {
let mut graph: PeerGraph = Graph::new();
let peer_id_to_node_index = PeerIdToNodexIdxMap::new();
for peer_id in peers.iter() {
peer_id_to_node_index.insert(*peer_id, graph.add_node(*peer_id));
}
for peer_id in peers.iter() {
let connected_peers = synced_info
.get_connected_peers(*peer_id)
.unwrap_or(BTreeSet::new());
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
let peer_avoid_relay_data = synced_info.get_avoid_relay_data(*peer_id);
for dst_peer_id in connected_peers.iter() {
let Some(dst_idx) = peer_id_to_node_index.get(dst_peer_id) else {
continue;
};
graph.add_edge(
*peer_id_to_node_index.get(&peer_id).unwrap(),
*dst_idx,
if peer_avoid_relay_data {
AVOID_RELAY_COST
} else {
cost_calc.calculate_cost(*peer_id, *dst_peer_id)
},
);
}
}
(graph, peer_id_to_node_index)
}
fn gen_next_hop_map_with_least_hop<T: RouteCostCalculatorInterface>(
my_peer_id: PeerId,
graph: &PeerGraph,
idx_map: &PeerIdToNodexIdxMap,
cost_calc: &mut T,
) -> NextHopMap {
let res = dijkstra(&graph, *idx_map.get(&my_peer_id).unwrap(), None, |_| 1);
let next_hop_map = NextHopMap::new();
for (node_idx, cost) in res.iter() {
if *cost == 0 {
continue;
}
let mut all_paths = all_simple_paths::<Vec<_>, _, RandomState>(
graph,
*idx_map.get(&my_peer_id).unwrap(),
*node_idx,
*cost - 1,
Some(*cost + 1), // considering having avoid relay, the max cost could be a bit larger.
)
.collect::<Vec<_>>();
assert!(!all_paths.is_empty());
all_paths.sort_by(|a, b| a.len().cmp(&b.len()));
// find a path with least cost.
let mut min_cost = i32::MAX;
let mut min_path_len = usize::MAX;
let mut min_path = Vec::new();
for path in all_paths.iter() {
if min_path_len < path.len() && min_cost < AVOID_RELAY_COST {
// the min path does not contain avoid relay node.
break;
}
let mut cost = 0;
for i in 0..path.len() - 1 {
let src_peer_id = *graph.node_weight(path[i]).unwrap();
let dst_peer_id = *graph.node_weight(path[i + 1]).unwrap();
let edge_weight = *graph
.edge_weight(graph.find_edge(path[i], path[i + 1]).unwrap())
.unwrap();
if edge_weight != 1 {
// means avoid relay.
cost += edge_weight;
} else {
cost += cost_calc.calculate_cost(src_peer_id, dst_peer_id);
}
}
if cost <= min_cost {
min_cost = cost;
min_path = path.clone();
min_path_len = path.len();
}
}
next_hop_map.insert(
*graph.node_weight(*node_idx).unwrap(),
NextHopInfo {
next_hop_peer_id: *graph.node_weight(min_path[1]).unwrap(),
path_latency: min_cost,
path_len: min_path_len,
},
);
}
next_hop_map
}
fn gen_next_hop_map_with_least_cost(
my_peer_id: PeerId,
graph: &PeerGraph,
idx_map: &PeerIdToNodexIdxMap,
) -> NextHopMap {
let next_hop_map = NextHopMap::new();
for item in idx_map.iter() {
if *item.key() == my_peer_id {
continue;
}
let dst_peer_node_idx = *item.value();
let Some((cost, path)) = astar::astar(
graph,
*idx_map.get(&my_peer_id).unwrap(),
|node_idx| node_idx == dst_peer_node_idx,
|e| *e.weight(),
|_| 0,
) else {
continue;
};
next_hop_map.insert(
*item.key(),
NextHopInfo {
next_hop_peer_id: *graph.node_weight(path[1]).unwrap(),
path_latency: cost,
path_len: path.len(),
},
);
}
next_hop_map
}
fn build_from_synced_info<T: RouteCostCalculatorInterface>(
&self,
my_peer_id: PeerId,
synced_info: &SyncedRouteInfo,
policy: NextHopPolicy,
mut cost_calc: T,
) {
// build peer_infos
self.peer_infos.clear();
cost_calc: &T,
) -> (PeerGraph, NodeIndex) {
let mut graph: PeerGraph = PeerGraph::new();
let mut start_node_idx = None;
let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new();
for item in synced_info.peer_infos.iter() {
let peer_id = item.key();
let info = item.value();
@@ -777,49 +692,175 @@ impl RouteTable {
continue;
}
self.peer_infos.insert(*peer_id, info.clone());
let node_idx = graph.add_node(*peer_id);
peer_id_to_node_index.insert(*peer_id, node_idx);
if *peer_id == my_peer_id {
start_node_idx = Some(node_idx);
}
}
if self.peer_infos.is_empty() {
if start_node_idx.is_none() {
return (graph, NodeIndex::end());
}
for item in peer_id_to_node_index.iter() {
let src_peer_id = item.key();
let src_node_idx = item.value();
let connected_peers = synced_info
.get_connected_peers(*src_peer_id)
.unwrap_or(BTreeSet::new());
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
let peer_avoid_relay_data = synced_info.get_avoid_relay_data(*src_peer_id);
for dst_peer_id in connected_peers.iter() {
let Some(dst_node_idx) = peer_id_to_node_index.get(dst_peer_id) else {
continue;
};
let mut cost = cost_calc.calculate_cost(*src_peer_id, *dst_peer_id) as usize;
if peer_avoid_relay_data {
cost += AVOID_RELAY_COST;
}
graph.add_edge(*src_node_idx, *dst_node_idx, cost);
}
}
(graph, start_node_idx.unwrap())
}
fn clean_expired_route_info(&self) {
let cur_version = self.next_hop_map_version.get();
self.next_hop_map.retain(|_, v| {
// remove next hop map for peers we cannot reach.
v.version >= cur_version
});
self.peer_infos.retain(|k, _| {
// remove peer info for peers we cannot reach.
self.next_hop_map.contains_key(k)
});
self.ipv4_peer_id_map.retain(|_, v| {
// remove ipv4 map for peers we cannot reach.
self.next_hop_map.contains_key(v)
});
self.cidr_peer_id_map.retain(|_, v| {
// remove cidr map for peers we cannot reach.
self.next_hop_map.contains_key(v)
});
}
fn gen_next_hop_map_with_least_hop(
&self,
graph: &PeerGraph,
start_node: &NodeIndex,
version: Version,
) {
let normalize_edge_cost = |e: petgraph::graph::EdgeReference<usize>| {
if *e.weight() >= AVOID_RELAY_COST {
AVOID_RELAY_COST + 1
} else {
1
}
};
// Step 1: 第一次 Dijkstra - 计算最短跳数
let path_len_map = dijkstra(&graph, *start_node, None, normalize_edge_cost);
// Step 2: 构建最短跳数子图(只保留属于最短路径和 AVOID RELAY 的边)
let mut subgraph: PeerGraph = PeerGraph::new();
let mut start_node_idx = None;
for (node_idx, peer_id) in graph.node_references() {
let new_node_idx = subgraph.add_node(*peer_id);
if node_idx == *start_node {
start_node_idx = Some(new_node_idx);
}
}
for edge in graph.edge_references() {
let (src, tgt) = graph.edge_endpoints(edge.id()).unwrap();
let Some(src_path_len) = path_len_map.get(&src) else {
continue;
};
let Some(tgt_path_len) = path_len_map.get(&tgt) else {
continue;
};
if *src_path_len + normalize_edge_cost(edge) == *tgt_path_len {
subgraph.add_edge(src, tgt, *edge.weight());
}
}
// Step 3: 第二次 Dijkstra - 在子图上找代价最小的路径
self.gen_next_hop_map_with_least_cost(&subgraph, &start_node_idx.clone().unwrap(), version);
}
fn gen_next_hop_map_with_least_cost(
&self,
graph: &PeerGraph,
start_node: &NodeIndex,
version: Version,
) {
let (costs, next_hops) = dijkstra_with_first_hop(&graph, *start_node, |e| *e.weight());
for (dst, (next_hop, path_len)) in next_hops.iter() {
let info = NextHopInfo {
next_hop_peer_id: *graph.node_weight(*next_hop).unwrap(),
path_latency: (*costs.get(dst).unwrap() % AVOID_RELAY_COST) as i32,
path_len: *path_len as usize,
version,
};
let dst_peer_id = *graph.node_weight(*dst).unwrap();
self.next_hop_map
.entry(dst_peer_id)
.and_modify(|x| {
if x.version < version {
*x = info;
}
})
.or_insert(info);
}
self.next_hop_map_version.set_if_larger(version);
}
fn build_from_synced_info<T: RouteCostCalculatorInterface>(
&self,
my_peer_id: PeerId,
synced_info: &SyncedRouteInfo,
policy: NextHopPolicy,
cost_calc: &T,
) {
let version = synced_info.version.get();
// build next hop map
let (graph, start_node) =
Self::build_peer_graph_from_synced_info(my_peer_id, &synced_info, cost_calc);
if graph.node_count() == 0 {
tracing::warn!("no peer in graph, cannot build next hop map");
return;
}
// build next hop map
self.next_hop_map.clear();
self.next_hop_map.insert(
my_peer_id,
NextHopInfo {
next_hop_peer_id: my_peer_id,
path_latency: 0,
path_len: 1,
},
);
let (graph, idx_map) = Self::build_peer_graph_from_synced_info(
self.peer_infos.iter().map(|x| *x.key()).collect(),
&synced_info,
&mut cost_calc,
);
let next_hop_map = if matches!(policy, NextHopPolicy::LeastHop) {
Self::gen_next_hop_map_with_least_hop(my_peer_id, &graph, &idx_map, &mut cost_calc)
if matches!(policy, NextHopPolicy::LeastHop) {
self.gen_next_hop_map_with_least_hop(&graph, &start_node, version);
} else {
Self::gen_next_hop_map_with_least_cost(my_peer_id, &graph, &idx_map)
self.gen_next_hop_map_with_least_cost(&graph, &start_node, version);
};
for item in next_hop_map.iter() {
self.next_hop_map.insert(*item.key(), *item.value());
}
// build graph
// build ipv4_peer_id_map, cidr_peer_id_map
self.ipv4_peer_id_map.clear();
self.cidr_peer_id_map.clear();
for item in self.peer_infos.iter() {
// only set ipv4 map for peers we can reach.
if !self.next_hop_map.contains_key(item.key()) {
// build peer_infos, ipv4_peer_id_map, cidr_peer_id_map
// only set map for peers we can reach.
for item in self.next_hop_map.iter() {
if item.version < version {
// skip if the next hop entry is outdated. (peer is unreachable)
continue;
}
let peer_id = item.key();
let info = item.value();
let Some(info) = synced_info.peer_infos.get(peer_id) else {
continue;
};
self.peer_infos.insert(*peer_id, info.clone());
if let Some(ipv4_addr) = info.ipv4_addr {
self.ipv4_peer_id_map.insert(ipv4_addr.into(), *peer_id);
@@ -1022,7 +1063,7 @@ struct PeerRouteServiceImpl {
interface: Mutex<Option<RouteInterfaceBox>>,
cost_calculator: std::sync::Mutex<Option<RouteCostCalculator>>,
cost_calculator: std::sync::RwLock<Option<RouteCostCalculator>>,
route_table: RouteTable,
route_table_with_cost: RouteTable,
foreign_network_owner_map: DashMap<NetworkIdentity, Vec<PeerId>>,
@@ -1063,7 +1104,7 @@ impl PeerRouteServiceImpl {
interface: Mutex::new(None),
cost_calculator: std::sync::Mutex::new(Some(Box::new(DefaultRouteCostCalculator))),
cost_calculator: std::sync::RwLock::new(Some(Box::new(DefaultRouteCostCalculator))),
route_table: RouteTable::new(),
route_table_with_cost: RouteTable::new(),
@@ -1074,6 +1115,7 @@ impl PeerRouteServiceImpl {
raw_peer_infos: DashMap::new(),
conn_map: DashMap::new(),
foreign_network: DashMap::new(),
version: AtomicVersion::new(),
},
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()),
@@ -1171,23 +1213,37 @@ impl PeerRouteServiceImpl {
}
fn update_route_table(&self) {
let mut calc_locked = self.cost_calculator.lock().unwrap();
self.cost_calculator
.write()
.unwrap()
.as_mut()
.unwrap()
.begin_update();
let calc_locked = self.cost_calculator.read().unwrap();
calc_locked.as_mut().unwrap().begin_update();
self.route_table.build_from_synced_info(
self.my_peer_id,
&self.synced_route_info,
NextHopPolicy::LeastHop,
calc_locked.as_mut().unwrap(),
calc_locked.as_ref().unwrap(),
);
self.route_table_with_cost.build_from_synced_info(
self.my_peer_id,
&self.synced_route_info,
NextHopPolicy::LeastCost,
calc_locked.as_mut().unwrap(),
calc_locked.as_ref().unwrap(),
);
calc_locked.as_mut().unwrap().end_update();
drop(calc_locked);
self.cost_calculator
.write()
.unwrap()
.as_mut()
.unwrap()
.end_update();
}
fn update_foreign_network_owner_map(&self) {
@@ -1221,7 +1277,7 @@ impl PeerRouteServiceImpl {
fn cost_calculator_need_update(&self) -> bool {
self.cost_calculator
.lock()
.read()
.unwrap()
.as_ref()
.map(|x| x.need_update())
@@ -1411,6 +1467,9 @@ impl PeerRouteServiceImpl {
for p in to_remove.iter() {
self.synced_route_info.foreign_network.remove(p);
}
self.route_table.clean_expired_route_info();
self.route_table_with_cost.clean_expired_route_info();
}
fn build_sync_route_raw_req(
@@ -2022,6 +2081,7 @@ impl PeerRoute {
if service_impl.cost_calculator_need_update() {
tracing::debug!("cost_calculator_need_update");
service_impl.synced_route_info.version.inc();
service_impl.update_route_table();
}
@@ -2136,7 +2196,7 @@ impl Route for PeerRoute {
let next_hop_peer_latency_first = route_table_with_cost.get_next_hop(*item.key());
let mut route: crate::proto::cli::Route = item.value().clone().into();
route.next_hop_peer_id = next_hop_peer.next_hop_peer_id;
route.cost = (next_hop_peer.path_len - 1) as i32;
route.cost = next_hop_peer.path_len as i32;
route.path_latency = next_hop_peer.path_latency;
route.next_hop_peer_id_latency_first =
@@ -2166,7 +2226,8 @@ impl Route for PeerRoute {
}
async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {
*self.service_impl.cost_calculator.lock().unwrap() = Some(_cost_fn);
*self.service_impl.cost_calculator.write().unwrap() = Some(_cost_fn);
self.service_impl.synced_route_info.version.inc();
self.service_impl.update_route_table();
}
@@ -2307,7 +2368,10 @@ mod tests {
for r in vec![r_a.clone(), r_b.clone()].iter() {
wait_for_condition(
|| async { r.list_routes().await.len() == 1 },
|| async {
println!("route: {:?}", r.list_routes().await);
r.list_routes().await.len() == 1
},
Duration::from_secs(5),
)
.await;
@@ -2348,6 +2412,8 @@ mod tests {
assert_eq!(i_a.0, i_b.1);
assert_eq!(i_b.0, i_a.1);
println!("after drop p_b, r_b");
drop(r_b);
drop(p_b);