fix: refresh ACL groups and enable TCP_NODELAY for WebSocket (#2118)

* fix: refresh ACL groups and enable TCP_NODELAY for WebSocket
* add remove_peers to remove list of peer id in ospf route
* fix secure tunnel for unreliable udp tunnel
* fix(web-client): timeout secure tunnel handshake
* fix(web-server): tolerate delayed secure hello
* fix quic endpoint panic
* fix replay check
This commit is contained in:
KKRainbow
2026-04-19 10:37:39 +08:00
committed by GitHub
parent c49c56612b
commit 2db655bd6d
14 changed files with 7824 additions and 1038 deletions
+46 -788
View File
@@ -1,26 +1,14 @@
use std::{
sync::{
Arc, Mutex, RwLock,
atomic::{AtomicBool, AtomicU32, Ordering},
},
time::{SystemTime, UNIX_EPOCH},
use std::sync::{
Arc, RwLock,
atomic::{AtomicBool, Ordering},
};
use atomic_shim::AtomicU64;
use crate::{
common::PeerId,
peers::encrypt::{Encryptor, create_encryptor},
tunnel::packet_def::{StandardAeadTail, ZCPacket},
};
use anyhow::anyhow;
use dashmap::DashMap;
use hmac::{Hmac, Mac as _};
use rand::RngCore as _;
use sha2::Sha256;
use zerocopy::FromBytes;
type HmacSha256 = Hmac<Sha256>;
use super::secure_datagram::{SecureDatagramDirection, SecureDatagramSession};
use crate::{common::PeerId, tunnel::packet_def::ZCPacket};
pub struct UpsertResponderSessionReturn {
pub session: Arc<PeerSession>,
pub action: PeerSessionAction,
@@ -87,9 +75,6 @@ impl PeerSessionStore {
self.sessions.insert(key, session);
}
/// Remove sessions that are no longer referenced by any PeerConn or RelayPeerMap.
/// A session with strong_count == 1 means only the store holds it — no active
/// connection is using it, so it can be safely cleaned up.
pub fn evict_unused_sessions(&self) {
self.sessions
.retain(|_key, session| Arc::strong_count(session) > 1);
@@ -188,7 +173,6 @@ impl PeerSessionStore {
}
PeerSessionAction::Sync | PeerSessionAction::Create => {
let root_key = root_key_32.ok_or_else(|| anyhow!("missing root_key"))?;
// If the existing session is invalidated, remove it so we create a fresh one
if let Some(existing) = self.sessions.get(key)
&& !existing.is_valid()
{
@@ -224,253 +208,25 @@ impl PeerSessionStore {
}
}
#[derive(Clone, Default)]
struct EpochKeySlot {
epoch: u32,
generation: u32,
valid: bool,
send_cipher: Option<Arc<dyn Encryptor>>,
recv_cipher: Option<Arc<dyn Encryptor>>,
}
impl std::fmt::Debug for EpochKeySlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EpochKeySlot")
.field("epoch", &self.epoch)
.field("generation", &self.generation)
.field("valid", &self.valid)
.finish()
}
}
impl EpochKeySlot {
fn get_encryptor(&self, is_send: bool) -> Arc<dyn Encryptor> {
if is_send {
self.send_cipher.as_ref().unwrap().clone()
} else {
self.recv_cipher.as_ref().unwrap().clone()
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct ReplayWindow256 {
max_seq: u64,
bitmap: [u8; 32],
valid: bool,
}
impl ReplayWindow256 {
fn clear(&mut self) {
self.max_seq = 0;
self.bitmap.fill(0);
self.valid = false;
}
fn test_bit(&self, idx: usize) -> bool {
let byte = idx / 8;
let bit = idx % 8;
(self.bitmap[byte] >> bit) & 1 == 1
}
fn set_bit(&mut self, idx: usize) {
let byte = idx / 8;
let bit = idx % 8;
self.bitmap[byte] |= 1u8 << bit;
}
fn shift_right(&mut self, shift: usize) {
if shift == 0 {
return;
}
let total_bits = 256usize;
if shift >= total_bits {
self.bitmap.fill(0);
return;
}
let byte_shift = shift / 8;
let bit_shift = shift % 8;
if byte_shift > 0 {
for i in (0..self.bitmap.len()).rev() {
self.bitmap[i] = if i >= byte_shift {
self.bitmap[i - byte_shift]
} else {
0
};
}
}
if bit_shift > 0 {
let mut carry = 0u8;
for b in self.bitmap.iter_mut() {
let new_carry = *b >> (8 - bit_shift);
*b = (*b << bit_shift) | carry;
carry = new_carry;
}
}
}
fn accept(&mut self, seq: u64) -> bool {
if !self.valid {
self.valid = true;
self.max_seq = seq;
self.set_bit(0);
return true;
}
if seq > self.max_seq {
let shift = (seq - self.max_seq) as usize;
self.shift_right(shift);
self.max_seq = seq;
self.set_bit(0);
return true;
}
let delta = (self.max_seq - seq) as usize;
if delta >= 256 {
return false;
}
if self.test_bit(delta) {
return false;
}
self.set_bit(delta);
true
}
}
#[derive(Debug, Clone, Copy, Default)]
struct EpochRxSlot {
epoch: u32,
window: ReplayWindow256,
last_rx_ms: u64,
valid: bool,
}
impl EpochRxSlot {
fn clear(&mut self) {
self.epoch = 0;
self.window.clear();
self.last_rx_ms = 0;
self.valid = false;
}
}
#[derive(Debug, Clone, Copy, Default)]
struct SyncRxGrace {
slots: [[EpochRxSlot; 2]; 2],
expires_at_ms: u64,
valid: bool,
}
impl SyncRxGrace {
fn clear(&mut self) {
self.slots = [[EpochRxSlot::default(), EpochRxSlot::default()]; 2];
self.expires_at_ms = 0;
self.valid = false;
}
fn refresh(&mut self, slots: [[EpochRxSlot; 2]; 2], expires_at_ms: u64) {
self.slots = slots;
self.expires_at_ms = expires_at_ms;
self.valid = true;
}
fn maybe_expire(&mut self, now_ms: u64) {
if self.valid && now_ms >= self.expires_at_ms {
self.clear();
}
}
}
pub struct PeerSession {
peer_id: PeerId,
root_key: RwLock<[u8; 32]>,
session_generation: AtomicU32,
peer_static_pubkey: RwLock<Option<[u8; 32]>>,
send_epoch: AtomicU32,
send_seq: [AtomicU64; 2],
send_epoch_started_ms: AtomicU64,
send_packets_since_epoch: AtomicU64,
rx_slots: Mutex<[[EpochRxSlot; 2]; 2]>,
key_cache: Mutex<[[EpochKeySlot; 2]; 2]>,
sync_rx_grace: Mutex<SyncRxGrace>,
sync_rx_grace_expires_at_ms: AtomicU64,
send_cipher_algorithm: String,
recv_cipher_algorithm: String,
/// Set to true when the session is detected as corrupted (persistent decrypt failures).
/// Holders of Arc<PeerSession> can check this to know the session should be discarded.
datagram: SecureDatagramSession,
invalidated: AtomicBool,
/// Consecutive decrypt failure counter. Auto-invalidates when threshold is reached.
decrypt_fail_count: AtomicU32,
}
impl std::fmt::Debug for PeerSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerSession")
.field("peer_id", &self.peer_id)
.field("root_key", &self.root_key)
.field("session_generation", &self.session_generation)
.field("peer_static_pubkey", &self.peer_static_pubkey)
.field("send_epoch", &self.send_epoch)
.field("send_seq", &self.send_seq)
.field("send_epoch_started_ms", &self.send_epoch_started_ms)
.field("send_packets_since_epoch", &self.send_packets_since_epoch)
.field("rx_slots", &self.rx_slots)
.field("key_cache", &self.key_cache)
.field("sync_rx_grace", &self.sync_rx_grace)
.field(
"sync_rx_grace_expires_at_ms",
&self.sync_rx_grace_expires_at_ms,
)
.field("send_cipher_algorithm", &self.send_cipher_algorithm)
.field("recv_cipher_algorithm", &self.recv_cipher_algorithm)
.field("datagram", &self.datagram)
.finish()
}
}
impl PeerSession {
/// Idle-eviction timeout for receive slots, in milliseconds.
///
/// If no packets are received for this period (~30 seconds), the
/// corresponding RX slot is considered idle and may be cleared/reused.
/// This helps reclaim state for dead peers or paths while still tolerating
/// short network stalls. Environments with very bursty or high-latency
/// traffic may want to increase this value; low-latency or tightly
/// resource-constrained deployments may lower it.
const EVICT_IDLE_AFTER_MS: u64 = 30_000;
/// Keep the pre-sync receive windows alive briefly so in-flight packets
/// from the previous epochs are still accepted after a shared session is
/// synced in place by another connection.
const SYNC_RX_GRACE_AFTER_MS: u64 = 5_000;
/// Maximum number of packets to send in a single epoch before forcing
/// a key/epoch rotation.
///
/// This bounds the amount of traffic protected under a single set of
/// derived keys, which is a common best practice for long-lived secure
/// channels. The current value (~1 million packets) is a conservative
/// default chosen to balance security (more frequent rotation) and
/// performance (avoiding excessive rekeying). Deployments with very high
/// or very low packet rates may tune this threshold accordingly.
const ROTATE_AFTER_PACKETS: u64 = 1_000_000;
/// Maximum wall-clock lifetime of a send epoch, in milliseconds.
///
/// Even if the packet-based limit is not reached, epochs are rotated
/// after this duration (~10 minutes) to avoid long-lived keys and keep
/// replay windows bounded in time. This also limits the impact of a
/// compromised key. Installations that prioritize lower overhead over
/// more aggressive key rotation may increase this value; those with
/// stricter security requirements may decrease it.
const ROTATE_AFTER_MS: u64 = 10 * 60 * 1000;
const MAX_ACCEPTED_RX_EPOCH_AHEAD: u32 = 3;
const DECRYPT_FAIL_THRESHOLD: u32 = 10;
const SYNC_RX_GRACE_AFTER_MS: u64 = SecureDatagramSession::SYNC_RX_GRACE_AFTER_MS;
pub fn new(
peer_id: PeerId,
@@ -481,32 +237,17 @@ impl PeerSession {
recv_cipher_algorithm: String,
peer_static_pubkey: Option<[u8; 32]>,
) -> Self {
let rx_slots = [
[EpochRxSlot::default(), EpochRxSlot::default()],
[EpochRxSlot::default(), EpochRxSlot::default()],
];
let key_cache = [
[EpochKeySlot::default(), EpochKeySlot::default()],
[EpochKeySlot::default(), EpochKeySlot::default()],
];
let now_ms = now_ms();
Self {
peer_id,
root_key: RwLock::new(root_key),
session_generation: AtomicU32::new(session_generation),
peer_static_pubkey: RwLock::new(peer_static_pubkey),
send_epoch: AtomicU32::new(initial_epoch),
send_seq: [AtomicU64::new(0), AtomicU64::new(0)],
send_epoch_started_ms: AtomicU64::new(now_ms),
send_packets_since_epoch: AtomicU64::new(0),
rx_slots: Mutex::new(rx_slots),
key_cache: Mutex::new(key_cache),
sync_rx_grace: Mutex::new(SyncRxGrace::default()),
sync_rx_grace_expires_at_ms: AtomicU64::new(0),
send_cipher_algorithm,
recv_cipher_algorithm,
datagram: SecureDatagramSession::new(
root_key,
session_generation,
initial_epoch,
send_cipher_algorithm,
recv_cipher_algorithm,
),
invalidated: AtomicBool::new(false),
decrypt_fail_count: AtomicU32::new(0),
}
}
@@ -514,44 +255,29 @@ impl PeerSession {
self.peer_id
}
/// Mark this session as invalid. All holders of Arc<PeerSession> will see this.
pub fn invalidate(&self) {
self.invalidated.store(true, Ordering::Relaxed);
self.datagram.invalidate();
}
pub fn is_valid(&self) -> bool {
!self.invalidated.load(Ordering::Relaxed)
!self.invalidated.load(Ordering::Relaxed) && self.datagram.is_valid()
}
pub fn session_generation(&self) -> u32 {
self.session_generation.load(Ordering::Relaxed)
self.datagram.session_generation()
}
pub fn root_key(&self) -> [u8; 32] {
*self.root_key.read().unwrap()
self.datagram.root_key()
}
pub fn new_root_key() -> [u8; 32] {
let mut out = [0u8; 32];
rand::rngs::OsRng.fill_bytes(&mut out);
out
SecureDatagramSession::new_root_key()
}
pub fn next_sync_epoch(&self) -> u32 {
let send_epoch = self.send_epoch.load(Ordering::Relaxed);
let rx = self.rx_slots.lock().unwrap();
let mut max_epoch = send_epoch;
for dir in 0..2 {
let cur = rx[dir][0];
if cur.valid {
max_epoch = max_epoch.max(cur.epoch);
}
let prev = rx[dir][1];
if prev.valid {
max_epoch = max_epoch.max(prev.epoch);
}
}
max_epoch.wrapping_add(1)
self.datagram.next_sync_epoch()
}
pub fn check_encrypt_algo_same(
@@ -559,12 +285,8 @@ impl PeerSession {
send_algorithm: &str,
recv_algorithm: &str,
) -> Result<(), anyhow::Error> {
if self.send_cipher_algorithm != send_algorithm
|| self.recv_cipher_algorithm != recv_algorithm
{
return Err(anyhow!("encrypt algorithm not same"));
}
Ok(())
self.datagram
.check_encrypt_algo_same(send_algorithm, recv_algorithm)
}
pub fn check_or_set_peer_static_pubkey(
@@ -592,277 +314,25 @@ impl PeerSession {
initial_epoch: u32,
preserve_rx_grace: bool,
) {
let old_root_key = self.root_key();
let can_preserve_rx_grace = preserve_rx_grace && old_root_key == root_key;
{
let mut g = self.root_key.write().unwrap();
*g = root_key;
}
self.session_generation
.store(session_generation, Ordering::Relaxed);
self.send_epoch.store(initial_epoch, Ordering::Relaxed);
self.send_seq[0].store(0, Ordering::Relaxed);
self.send_seq[1].store(0, Ordering::Relaxed);
self.send_epoch_started_ms
.store(now_ms(), Ordering::Relaxed);
self.send_packets_since_epoch.store(0, Ordering::Relaxed);
{
let mut rx = self.rx_slots.lock().unwrap();
let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap();
if can_preserve_rx_grace {
let expires_at_ms = now_ms().saturating_add(Self::SYNC_RX_GRACE_AFTER_MS);
sync_rx_grace.refresh(*rx, expires_at_ms);
self.sync_rx_grace_expires_at_ms
.store(expires_at_ms, Ordering::Relaxed);
} else {
sync_rx_grace.clear();
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
}
for dir in 0..2 {
rx[dir][0].clear();
rx[dir][1].clear();
}
}
self.key_cache
.lock()
.unwrap()
.fill([EpochKeySlot::default(), EpochKeySlot::default()]);
self.datagram.sync_root_key(
root_key,
session_generation,
initial_epoch,
preserve_rx_grace,
);
}
pub fn dir_for_sender(sender_peer_id: PeerId, receiver_peer_id: PeerId) -> usize {
pub fn dir_for_sender(
sender_peer_id: PeerId,
receiver_peer_id: PeerId,
) -> SecureDatagramDirection {
if sender_peer_id < receiver_peer_id {
0
SecureDatagramDirection::AToB
} else {
1
SecureDatagramDirection::BToA
}
}
fn hkdf_traffic_key(&self, epoch: u32, dir: usize) -> [u8; 32] {
let root_key = self.root_key();
let salt = [0u8; 32];
let mut extract = HmacSha256::new_from_slice(&salt).unwrap();
extract.update(&root_key);
let prk = extract.finalize().into_bytes();
let mut info = Vec::with_capacity(9 + 4 + 1);
info.extend_from_slice(b"et-traffic");
info.extend_from_slice(&epoch.to_be_bytes());
info.push(dir as u8);
let mut expand = HmacSha256::new_from_slice(&prk).unwrap();
expand.update(&info);
expand.update(&[1u8]);
let okm = expand.finalize().into_bytes();
let mut key = [0u8; 32];
key.copy_from_slice(&okm[..32]);
key
}
fn get_or_create_encryptor(
&self,
epoch: u32,
dir: usize,
generation: u32,
is_send: bool,
) -> Arc<dyn Encryptor> {
let mut guard = self.key_cache.lock().unwrap();
for slot in guard[dir].iter_mut() {
if slot.valid && slot.epoch == epoch && slot.generation == generation {
return slot.get_encryptor(is_send);
}
}
let key = self.hkdf_traffic_key(epoch, dir);
let mut key_128 = [0u8; 16];
key_128.copy_from_slice(&key[..16]);
let slot = EpochKeySlot {
epoch,
generation,
valid: true,
send_cipher: Some(create_encryptor(&self.send_cipher_algorithm, key_128, key)),
recv_cipher: Some(create_encryptor(&self.recv_cipher_algorithm, key_128, key)),
};
let ret = slot.get_encryptor(is_send);
if !guard[dir][0].valid || guard[dir][0].epoch == epoch {
guard[dir][0] = slot;
} else {
guard[dir][1] = slot;
}
ret
}
fn maybe_rotate_epoch(&self, now_ms: u64) {
let packets = self
.send_packets_since_epoch
.fetch_add(1, Ordering::Relaxed)
+ 1;
let started = self.send_epoch_started_ms.load(Ordering::Relaxed);
if packets < Self::ROTATE_AFTER_PACKETS
&& now_ms.saturating_sub(started) < Self::ROTATE_AFTER_MS
{
return;
}
let cur = self.send_epoch.load(Ordering::Relaxed);
let next = cur.wrapping_add(1);
if self
.send_epoch
.compare_exchange(cur, next, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.send_epoch_started_ms.store(now_ms, Ordering::Relaxed);
self.send_packets_since_epoch.store(0, Ordering::Relaxed);
}
}
fn next_nonce(&self, dir: usize) -> (u32, u64, [u8; 12]) {
let now_ms = now_ms();
self.maybe_rotate_epoch(now_ms);
let epoch = self.send_epoch.load(Ordering::Relaxed);
let seq = self.send_seq[dir].fetch_add(1, Ordering::Relaxed);
let mut nonce = [0u8; 12];
nonce[..4].copy_from_slice(&epoch.to_be_bytes());
nonce[4..].copy_from_slice(&seq.to_be_bytes());
(epoch, seq, nonce)
}
fn parse_tail(payload: &[u8]) -> Option<[u8; 12]> {
let tail = StandardAeadTail::ref_from_suffix(payload)?;
Some(tail.nonce)
}
fn evict_old_rx_slots(rx: &mut [[EpochRxSlot; 2]; 2], now_ms: u64) {
for dir_slots in rx.iter_mut() {
for slot in dir_slots.iter_mut() {
if !slot.valid {
continue;
}
let last = slot.last_rx_ms;
if last != 0 && now_ms.saturating_sub(last) > Self::EVICT_IDLE_AFTER_MS {
slot.clear();
}
}
}
}
fn epoch_in_slots(slots: &[EpochRxSlot; 2], epoch: u32) -> bool {
slots[0].valid && slots[0].epoch == epoch || slots[1].valid && slots[1].epoch == epoch
}
fn sync_rx_grace_active(&self, now_ms: u64) -> bool {
let expires_at_ms = self.sync_rx_grace_expires_at_ms.load(Ordering::Relaxed);
if expires_at_ms == 0 {
return false;
}
if now_ms < expires_at_ms {
return true;
}
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
false
}
fn check_replay(&self, epoch: u32, seq: u64, dir: usize, now_ms: u64) -> bool {
let mut rx = self.rx_slots.lock().unwrap();
Self::evict_old_rx_slots(&mut rx, now_ms);
let mut sync_rx_grace = if self.sync_rx_grace_active(now_ms) {
let mut sync_rx_grace = self.sync_rx_grace.lock().unwrap();
sync_rx_grace.maybe_expire(now_ms);
if sync_rx_grace.valid {
Self::evict_old_rx_slots(&mut sync_rx_grace.slots, now_ms);
Some(sync_rx_grace)
} else {
self.sync_rx_grace_expires_at_ms.store(0, Ordering::Relaxed);
None
}
} else {
None
};
let send_epoch = self.send_epoch.load(Ordering::Relaxed);
{
let mut key_cache = self.key_cache.lock().unwrap();
for d in 0..2 {
for s in 0..2 {
if !key_cache[d][s].valid {
continue;
}
let e = key_cache[d][s].epoch;
let allowed = e == send_epoch
|| rx[d][0].valid && rx[d][0].epoch == e
|| rx[d][1].valid && rx[d][1].epoch == e
|| sync_rx_grace
.as_ref()
.is_some_and(|g| Self::epoch_in_slots(&g.slots[d], e));
if !allowed {
key_cache[d][s].valid = false;
}
}
}
}
if sync_rx_grace
.as_ref()
.is_some_and(|g| Self::epoch_in_slots(&g.slots[dir], epoch))
{
for slot in sync_rx_grace.as_mut().unwrap().slots[dir].iter_mut() {
if slot.valid && slot.epoch == epoch {
slot.last_rx_ms = now_ms;
return slot.window.accept(seq);
}
}
}
if !rx[dir][0].valid {
rx[dir][0] = EpochRxSlot {
epoch,
window: ReplayWindow256::default(),
last_rx_ms: now_ms,
valid: true,
};
}
if rx[dir][0].valid && epoch == rx[dir][0].epoch {
rx[dir][0].last_rx_ms = now_ms;
return rx[dir][0].window.accept(seq);
}
if rx[dir][1].valid && epoch == rx[dir][1].epoch {
rx[dir][1].last_rx_ms = now_ms;
return rx[dir][1].window.accept(seq);
}
if rx[dir][0].valid && epoch > rx[dir][0].epoch {
let mut baseline_epoch = send_epoch;
if rx[dir][0].valid {
baseline_epoch = baseline_epoch.max(rx[dir][0].epoch);
}
if rx[dir][1].valid {
baseline_epoch = baseline_epoch.max(rx[dir][1].epoch);
}
let max_allowed_epoch =
baseline_epoch.saturating_add(Self::MAX_ACCEPTED_RX_EPOCH_AHEAD);
if epoch > max_allowed_epoch {
return false;
}
rx[dir][1] = rx[dir][0];
rx[dir][0] = EpochRxSlot {
epoch,
window: ReplayWindow256::default(),
last_rx_ms: now_ms,
valid: true,
};
return rx[dir][0].window.accept(seq);
}
false
}
pub fn encrypt_payload(
&self,
sender_peer_id: PeerId,
@@ -872,19 +342,8 @@ impl PeerSession {
if !self.is_valid() {
return Err(anyhow!("session invalidated"));
}
let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id);
let (epoch, _seq, nonce_bytes) = self.next_nonce(dir);
let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), true);
if let Err(e) = encryptor.encrypt_with_nonce(pkt, Some(nonce_bytes.as_slice())) {
tracing::warn!(
peer_id = ?self.peer_id,
?e,
"session encrypt failed, invalidating"
);
self.invalidate();
return Err(e.into());
}
Ok(())
self.datagram
.encrypt_payload(Self::dir_for_sender(sender_peer_id, receiver_peer_id), pkt)
}
pub fn decrypt_payload(
@@ -896,47 +355,13 @@ impl PeerSession {
if !self.is_valid() {
return Err(anyhow!("session invalidated"));
}
let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id);
let nonce_bytes =
Self::parse_tail(ciphertext_with_tail.payload()).ok_or_else(|| anyhow!("no tail"))?;
let epoch = u32::from_be_bytes(nonce_bytes[..4].try_into().unwrap());
let seq = u64::from_be_bytes(nonce_bytes[4..].try_into().unwrap());
let now_ms = now_ms();
if !self.check_replay(epoch, seq, dir, now_ms) {
return Err(anyhow!(
"replay rejected, sender_peer_id: {:?}, receiver_peer_id: {:?}",
sender_peer_id,
receiver_peer_id
));
}
let encryptor = self.get_or_create_encryptor(epoch, dir, self.session_generation(), false);
if let Err(e) = encryptor.decrypt(ciphertext_with_tail) {
let count = self.decrypt_fail_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= Self::DECRYPT_FAIL_THRESHOLD {
self.invalidate();
tracing::warn!(
peer_id = ?self.peer_id,
count,
"session auto-invalidated after consecutive decrypt failures"
);
}
return Err(e.into());
}
self.decrypt_fail_count.store(0, Ordering::Relaxed);
Ok(())
self.datagram.decrypt_payload(
Self::dir_for_sender(sender_peer_id, receiver_peer_id),
ciphertext_with_tail,
)
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
@@ -984,177 +409,10 @@ mod tests {
}
#[test]
fn replay_rejects_far_future_epoch_without_poisoning_window() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let generation = 1u32;
let initial_epoch = 0u32;
let s = PeerSession::new(
peer_id,
root_key,
generation,
initial_epoch,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 1, 0, now));
assert!(s.check_replay(0, 2, 0, now));
assert!(!s.check_replay(1000, 1, 0, now));
assert!(s.check_replay(1, 1, 0, now + 1));
assert!(s.check_replay(1, 2, 0, now + 2));
}
#[test]
fn replay_window_shift_preserves_bits() {
let mut w = ReplayWindow256::default();
// Accept seqs 0..10
for i in 0..10u64 {
assert!(w.accept(i), "seq {i} should be accepted");
}
assert_eq!(w.max_seq, 9);
// All seqs 0..10 should be marked as seen (replay)
for i in 0..10u64 {
assert!(!w.accept(i), "seq {i} should be rejected as replay");
}
// Seq 10 should still be accepted
assert!(w.accept(10));
}
#[test]
fn replay_window_out_of_order_within_window() {
let mut w = ReplayWindow256::default();
// Accept even seqs 0,2,4,...,20
for i in (0..=20u64).step_by(2) {
assert!(w.accept(i), "seq {i} should be accepted");
}
// Now accept odd seqs 1,3,5,...,19 (out of order, within window)
for i in (1..=19u64).step_by(2) {
assert!(w.accept(i), "seq {i} should be accepted (out of order)");
}
// All seqs 0..=20 should now be marked as seen
for i in 0..=20u64 {
assert!(!w.accept(i), "seq {i} should be rejected as replay");
}
}
#[test]
fn sync_root_key_allows_any_epoch_from_remote() {
// After sync_root_key, the remote peer may still be sending at an
// old epoch. The receiver should accept those packets.
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
// Simulate receiving some packets at epoch 0
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(0, 1, 0, now));
// Sync with initial_epoch=2 (simulating a Sync action)
s.sync_root_key(root_key, 2, 2, true);
// Remote peer is still sending at epoch 0 — should be accepted
// (rx_slots were cleared, so the first packet establishes the epoch)
assert!(
s.check_replay(0, 10, 0, now + 1),
"packets at old epoch should be accepted after sync"
);
}
#[test]
fn sync_root_key_keeps_previous_epochs_during_grace_window() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(root_key, 2, 2, true);
// The first packet after sync may already use the new epoch.
assert!(s.check_replay(2, 0, 0, now + 2));
// Older in-flight packets from pre-sync epochs should still be accepted
// during the grace period, regardless of arrival order.
assert!(s.check_replay(1, 1, 0, now + 3));
assert!(s.check_replay(0, 1, 0, now + 4));
}
#[test]
fn sync_root_key_expires_previous_epochs_after_grace_window() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(root_key, 2, 2, true);
assert!(s.check_replay(2, 0, 0, now + 2));
assert!(
!s.check_replay(0, 1, 0, now + PeerSession::SYNC_RX_GRACE_AFTER_MS + 3),
"old epochs should stop being accepted once the sync grace window expires"
);
}
#[test]
fn sync_root_key_does_not_preserve_previous_epochs_when_root_key_changes() {
let peer_id: PeerId = 10;
let root_key = PeerSession::new_root_key();
let s = PeerSession::new(
peer_id,
root_key,
1,
0,
"aes-256-gcm".to_string(),
"aes-256-gcm".to_string(),
None,
);
let now = now_ms();
assert!(s.check_replay(0, 0, 0, now));
assert!(s.check_replay(1, 0, 0, now + 1));
s.sync_root_key(PeerSession::new_root_key(), 2, 2, true);
assert!(s.check_replay(2, 0, 0, now + 2));
assert!(
!s.check_replay(1, 1, 0, now + 3),
"old epochs should not be preserved when sync replaces the root key"
fn sync_root_key_preserves_generic_grace_window_constant() {
assert_eq!(
PeerSession::SYNC_RX_GRACE_AFTER_MS,
SecureDatagramSession::SYNC_RX_GRACE_AFTER_MS
);
}
}