diff --git a/Cargo.lock b/Cargo.lock index f4c08192..b42e321d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -816,7 +816,7 @@ dependencies = [ "bitflags 2.8.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "regex", @@ -2234,6 +2234,7 @@ dependencies = [ "sha2", "shellexpand", "smoltcp", + "snow", "socket2 0.5.10", "stun_codec", "sys-locale", @@ -2268,6 +2269,7 @@ dependencies = [ "windows-service", "windows-sys 0.52.0", "winreg 0.52.0", + "x25519-dalek", "zerocopy", "zip", "zstd", @@ -6879,15 +6881,14 @@ checksum = "fc7c8f7f733062b66dc1c63f9db168ac0b97a9210e247fa90fdc9ad08f51b302" [[package]] name = "ring" -version = "0.17.8" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", "getrandom 0.2.15", "libc", - "spin", "untrusted", "windows-sys 0.52.0", ] @@ -7985,6 +7986,23 @@ dependencies = [ "managed", ] +[[package]] +name = "snow" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "599b506ccc4aff8cf7844bc42cf783009a434c1e26c964432560fb6d6ad02d82" +dependencies = [ + "aes-gcm", + "blake2", + "chacha20poly1305", + "curve25519-dalek", + "getrandom 0.3.2", + "ring", + "rustc_version", + "sha2", + "subtle", +] + [[package]] name = "socket2" version = "0.5.10" diff --git a/easytier-contrib/easytier-uptime/src/health_checker.rs b/easytier-contrib/easytier-uptime/src/health_checker.rs index 44cbc7af..25b2fdcf 100644 --- a/easytier-contrib/easytier-uptime/src/health_checker.rs +++ b/easytier-contrib/easytier-uptime/src/health_checker.rs @@ -359,6 +359,7 @@ impl HealthChecker { ) .parse() .with_context(|| "failed to parse peer uri")?, + peer_public_key: None, }]); let inst_id = inst_id.unwrap_or(uuid::Uuid::new_v4()); diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 78609387..c8abff83 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -154,6 +154,8 @@ ring = { version = "0.17", optional = true } bitflags = "2.5" aes-gcm = { version = "0.10.3", optional = true } openssl = { version = "0.10", optional = true, features = ["vendored"] } +snow = "0.10.0" +x25519-dalek = "2.0" # for cli tabled = "0.16" diff --git a/easytier/docs/peer_conn_secure_mode_v3.md b/easytier/docs/peer_conn_secure_mode_v3.md new file mode 100644 index 00000000..571180d2 --- /dev/null +++ b/easytier/docs/peer_conn_secure_mode_v3.md @@ -0,0 +1,509 @@ +# PeerConn Secure Mode(乱序隧道友好) + +本文是对“PeerConn 安全模式”下一阶段协议的完整规格草案,目标是在底层 `Tunnel` **不保证顺序交付**(可能乱序/丢包)的前提下: + +- 仍使用 Noise 进行握手(加密、认证、channel binding) +- 数据面不使用 `snow::TransportState` 逐包加解密(因为它隐式递增 nonce,要求有序) +- 在数据包尾部携带 **12B 明文 nonce**(与项目当前的“包尾 nonce”加密格式对齐),并把 **epoch 编进 nonce** +- 以尽可能低的内存开销实现 anti-replay(默认窗口 256) +- 多条 PeerConn 之间复用同一份 Peer 级安全会话(PeerSession) + +该文档只描述协议与数据结构;实现时以本文为准做迭代。 + +--- + +## 背景 + +### 节点角色 + +系统中常见两类角色(由配置与信任锚点决定,而非代码里硬编码的“节点类型”): + +- **用户节点(同网节点)**:通常持有 `network_secret`,期望与同一 `network_name` 的其他节点建立强认证连接。 +- **共享节点(基础设施节点)**:通常不持有用户的 `network_secret`,用于为多个用户网络提供转发/中继能力;客户端可通过 pinning 共享节点的长期 static 公钥获得“服务器认证”。 + +基于握手中交换的 `network_name` 可以得到 **角色提示(role hint)**: + +- `a_network_name == b_network_name`:同网提示 +- 否则:共享节点/外网提示 + +但 `network_name` 不是认证锚点;安全决策应基于 pinning 或 `network_secret_confirmed`(见 8)。 + +### 连接方式与 PeerConn + +在实现中,“peer 与 peer 之间的连接”由 `PeerConn` 表示,它绑定一条底层 `Tunnel`(可能是 tcp/udp/quic/wg/ring 等),并以 `PeerManagerHeader` 承载上层消息: + +- `PacketType::HandShake`:传统 PeerConn 握手 +- `PacketType::NoiseHandshake`:安全模式下的 Noise 握手 + +参考:[packet_def.rs](file:///data/project/EasyTier/easytier/src/tunnel/packet_def.rs#L59-L77)。 + +PeerManager 会在连接建立时走不同入口: + +- 主动方:`add_client_tunnel` -> `PeerConn::do_handshake_as_client` +- 被动方:`add_tunnel_as_server` -> `PeerConn::do_handshake_as_server` + +参考:[peer_manager.rs](file:///data/project/EasyTier/easytier/src/peers/peer_manager.rs#L361-L379)。 + +### 多连接与 foreign network + +同一对 peer 之间可能出现多条 PeerConn(多路径、多协议、重连等),因此需要一个 Peer 级别的“安全会话”来复用认证结果与数据面密钥(见 7.3 的 PeerSession)。 + +此外,当握手得到的对端 `network_name` 与本地不一致时,PeerManager 会将该连接纳入 foreign network 相关逻辑(例如 foreign network client/manager)以支持“共享节点”模式与跨网络转发: + +参考:[peer_manager.rs](file:///data/project/EasyTier/easytier/src/peers/peer_manager.rs#L361-L377)。 + +### 为什么需要本方案 + +若数据面直接使用 `snow::TransportState` 逐包加解密,会隐式递增 nonce,从而要求底层按序交付。由于本项目的数据加密格式本就采用“包尾明文 nonce”(例如 `AesGcmTail.nonce[12]`,以及 ring ChaCha20-Poly1305 的同形 tail),因此本文延续“尾部 nonce”风格,并将 nonce 结构化为 `epoch||seq`,以满足: + +- 乱序可解密 +- 低内存 anti-replay +- epoch/key 轮换 +- 多 PeerConn 复用 PeerSession + +参考:[packet_def.rs:AesGcmTail](file:///data/project/EasyTier/easytier/src/tunnel/packet_def.rs#L266-L273)、[ring_chacha20.rs](file:///data/project/EasyTier/easytier/src/peers/encrypt/ring_chacha20.rs#L69-L93)。 + +--- + +## 0. 约束与假设 + +- 底层 tunnel 可能乱序/丢包;因此数据面必须支持乱序解密。 +- 外层已有 `PeerManagerHeader`,包含 `from_peer_id` / `to_peer_id`,可作为对端身份索引,数据面无需额外携带 `session_id`。 +- 保持既有的安全语义目标: + - 共享节点 pinning(基于对端 Noise static pubkey) + - network_secret 的 channel binding 确认(`handshake_hash`) + - “尽早交换 network_name”用于角色判断(同网/共享节点) + +--- + +## 1. 术语 + +- **PeerConn**:一条具体的底层连接/路径(可能同一对 peer 存在多条)。 +- **PeerSession**:Peer 级别的安全会话状态(密钥、epoch、nonce、anti-replay、认证等级等)。 +- **epoch**:数据面密钥版本号(key id)。编码进 12B nonce 的高 4B。 +- **seq**:发送序号(per-direction 单调递增 u64)。编码进 12B nonce 的低 8B。 +- **nonce12**:明文 12B nonce,按 `epoch||seq` 编码,附在密文尾部。 +- **AAD**:AEAD 的附加认证数据。本文建议使用空 AAD(与项目当前的 ring/openssl 加密实现一致),未来可扩展为覆盖部分 header。 + +--- + +## 2. 关键 wire 结构(参考) + +### 2.1 PeerManagerHeader(16B) + +来自 [packet_def.rs](file:///data/project/EasyTier/easytier/src/tunnel/packet_def.rs#L93-L105): + +| 字段 | 类型 | 大小 | +| --------------- | -------: | ------: | +| from_peer_id | u32 (LE) | 4 | +| to_peer_id | u32 (LE) | 4 | +| packet_type | u8 | 1 | +| flags | u8 | 1 | +| forward_counter | u8 | 1 | +| reserved | u8 | 1 | +| len | u32 (LE) | 4 | +| **合计** | | **16B** | + +### 2.2 AES-GCM 包尾(28B) + +来自 [packet_def.rs](file:///data/project/EasyTier/easytier/src/tunnel/packet_def.rs#L266-L273): + +```text +AesGcmTail { + tag: [u8; 16] // 16B + nonce: [u8; 12] // 12B +} // 合计 28B +``` + +ring ChaCha20-Poly1305 的尾部结构与之同形: +[ring_chacha20.rs](file:///data/project/EasyTier/easytier/src/peers/encrypt/ring_chacha20.rs#L8-L16)。 + +--- + +## 3. 数据面:nonce/epoch/seq 规格 + +### 3.1 Nonce12(12B,明文附在包尾) + +定义: + +| 字段 | 编码 | 大小 | +| -------- | -------------- | ------: | +| epoch | u32 big-endian | 4 | +| seq | u64 big-endian | 8 | +| **合计** | | **12B** | + +记为: + +```text +nonce12 = epoch_be_u32 || seq_be_u64 +``` + +### 3.2 发送端规则(每方向) + +- `seq`:u64 单调递增,从 0 开始,每发送一个包 `seq += 1`。 +- `epoch`:u32,初始为 0。轮换时 `epoch += 1` 并切换到新 key。 +- `nonce12`:按 `epoch||seq` 生成,作为 AEAD nonce,同时明文写入包尾。 + +**安全性要求**:同一把 data key 下,`nonce12` 必须不重复。该要求通过“epoch 变化必然对应 key 变化 + 同一 epoch 内 seq 单调递增”保证。 + +### 3.3 接收端规则(每方向) + +通信是双向的:双方都会为每个对端 peer 维护一份 `PeerSession`。其中发送方向状态用于生成 `nonce12`(见 3.2),接收方向状态用于乱序解密与 anti-replay(见本节与第 5 节)。本节仅描述“接收路径”的处理流程: + +1. 从包尾读取 `nonce12`,解析出 `(epoch, seq)`。 +2. 选择对应 epoch 的 data key(允许短期保留多个 epoch,见 6.2)。 +3. 执行 anti-replay 检查(见 5)。 +4. AEAD 解密 payload(tag 校验失败视为丢包)。 + +--- + +## 4. 数据面:AEAD 封装 + +### 4.1 选择算法 + +本文以“尾部 tag(16) + nonce(12)”为基准,兼容: + +- AES-256-GCM(tag=16, nonce=12, key=32) +- ChaCha20-Poly1305(tag=16, nonce=12, key=32) + +### 4.2 密文布局(以 AEAD tail 形式描述) + +```text +wire_payload = ciphertext || tag16 || nonce12 +``` + +其中: + +- `ciphertext`:对原 payload 的加密结果(与原明文等长) +- `tag16`:AEAD tag(16B) +- `nonce12`:明文(12B),用于乱序解密与 anti-replay + +### 4.3 AAD + +默认:`AAD = empty`(与项目当前 ring encryptor 一致)。 + +扩展(可选):未来可把 `PeerManagerHeader` 的部分字段纳入 AAD(例如 from/to/packet_type/flags),以抵御“改 header 不改密文”的攻击面。该扩展不影响 nonce/epoch/seq 设计。 + +--- + +## 5. anti-replay(最小内存配置) + +### 5.1 默认窗口参数 + +- `window_size = 256` +- `keep_epochs = 2`(current + previous) +- `evict_idle_after = 30s`(某 epoch 长时间无包则回收其窗口与 key) + +### 5.2 ReplayWindow256(概念结构与大小) + +按“尽可能低内存”为目标,建议使用固定大小窗口(bitmap): + +```text +ReplayWindow256 { + max_seq: u64 // 8B + bitmap: [u8; 32] // 256bit = 32B +} // 合计 40B(按字段大小计,不含语言实现的对齐/额外元数据) +``` + +说明: + +- `bitmap` 的第 0 位表示 `max_seq` 是否已见;第 i 位表示 `max_seq - i` 是否已见。 +- 若 `seq > max_seq`:右移窗口并置位。 +- 若 `seq <= max_seq`:计算 `delta = max_seq - seq`,若 `delta >= 256` 丢弃(视为太旧);否则检查 bitmap 是否已置位,已置位则丢弃(重放),未置位则接受并置位。 + +### 5.3 ReplayState(每个对端、每个方向、每个 epoch) + +为减少内存,可用“固定 2 个 epoch 槽位”而非 HashMap: + +```text +EpochRxSlot { + epoch: u32 // 4B + window: ReplayWindow256 // 40B + last_rx_ms: u64 // 8B(用于 30s 淘汰) + valid: bool // 1B(实现细节) +} +``` + +每个对端、每个方向保留 2 个 `EpochRxSlot`: + +- current_epoch_slot +- previous_epoch_slot + +内存量级(按字段大小粗算): + +- 单方向:约 2 * (4 + 40 + 8 + 1) ≈ 106B +- 双方向:约 212B + +加上 epoch key 缓存(见 6.2)仍处于“每对端几百字节”级别。 + +--- + +## 6. epoch 与密钥派生/轮换 + +### 6.1 密钥层次 + +推荐将 Noise 仅用于握手与认证绑定,数据面 key 由一个会话根密钥 `root_key` 派生: + +```text +root_key: [u8; 32] // 会话根密钥材料 +``` + +随后对每个 epoch 与方向派生 traffic key: + +```text +k(epoch, dir) = HKDF(root_key, "et-traffic" || epoch_u32_be || dir_byte) +``` + +- `dir_byte`:发送方向标识(例如 0=tx, 1=rx) +- 输出长度:32B(用于 AES-256-GCM 或 ChaCha20-Poly1305) + +### 6.2 key 缓存(keep_epochs = 2) + +对每个对端 peer、每个方向,缓存 2 个 epoch 的 key: + +```text +EpochKeySlot { + epoch: u32 + key: [u8; 32] // 32B + valid: bool +} +``` + +接收时按 `(epoch)` 选择 key;若 epoch 是 current 或 previous 则可解密,否则丢弃(或可选:尝试少量临近 epoch,代价是试解密)。 + +### 6.3 轮换策略(默认无额外控制消息) + +为减少协议复杂度,默认策略: + +- 发送端在满足“包数阈值”或“时间阈值”时将 `epoch += 1` 并开始使用新 key。 +- 接收端不需要提前知道轮换点:从明文 `nonce12.epoch` 即可选择正确 key。 +- 接收端保留 `keep_epochs=2`,保证轮换期间乱序旧包仍可解密。 + +可选增强(未来): + +- 若希望更强一致性,可定义一个控制包通告 `epoch_advance`,但不是本方案的必要条件。 + +--- + +## 7. 握手层:Noise_XX + 角色/认证/会话根密钥 + +### 7.1 目标 + +在每条 PeerConn 建立时运行 Noise_XX 握手,完成: + +- 交换 `network_name`(尽早判断同网/共享节点) +- 完成共享节点 pubkey pinning(若配置) +- 完成 `network_secret_confirmed`(若双方都有 secret) +- 协商 PeerSession(join 或 create),并在需要时同步 `root_key` 与 `epoch` 起点 + +### 7.2 prologue + +prologue 固定为协议版本字符串,不包含 `network_name`,以避免跨 network_name 的连接被拒绝: + +```text +prologue = "easytier-peerconn-noise" +``` + +### 7.3 PeerSession:join / create / sync 规则 + +本文引入 Peer 级会话 `PeerSession`(每个对端 peer 一份),用于跨多条 PeerConn 复用数据面密钥与 anti-replay 状态。 + +#### 7.3.1 PeerSession 的身份字段 + +数据面不携带 `session_id`,因此会话的“索引键”是外层 `PeerManagerHeader.from_peer_id`(对端 peer_id)。但为了在握手阶段判断 join/create/sync,需要额外维护: + +```text +PeerSessionMeta { + session_generation: u32 // 4B,单调递增,会话根密钥 root_key 的版本号 + auth_level: u8 // 1B,对齐 secure_auth_level 的枚举语义 +} +``` + +语义: + +- `session_generation` 变化表示 `root_key` 发生轮换(create)。 +- `session_generation` 不变表示复用已有 `root_key`(join)。 + +#### 7.3.2 参与方角色 + +- Initiator:发起连接的一方(A) +- Responder:接收连接的一方(B) + +在本方案中,**Responder 对会话选择具有权威性**:最终使用哪一代 `root_key` 以 msg2 返回为准。 + +#### 7.3.3 Responder 的决策(核心) + +Responder 在收到 msg1 后,读取 Initiator 提供的 `a_session_generation`(可选)并与本地 PeerSession 进行对比,按以下优先级决策: + +1. **本地不存在 PeerSession**:执行 `CREATE`(生成新的 `root_key` 与 `session_generation=1`)。 +2. **本地存在 PeerSession 且 a_session_generation 与本地一致**:执行 `JOIN`(不轮换 root_key)。 +3. **本地存在 PeerSession 但 a_session_generation 缺失或不一致**:执行 `SYNC`(不轮换 root_key,但在 msg2 中携带当前 `root_key` 与 `session_generation`,使对端同步到本地会话)。 + +安全性与 DoS 说明: + +- 默认不允许对端通过握手触发 root_key 轮换(避免对端反复拨号导致会话重置)。 +- 只有在“本地不存在会话”或“本地策略显式要求轮换”(例如人工触发、密钥泄露处置)时才执行 `CREATE`。 + +#### 7.3.4 Initiator 的行为 + +Initiator 在握手开始前读取本地是否已有对端 PeerSession: + +- 若存在:在 msg1 中携带本地 `a_session_generation`。 +- 若不存在:msg1 不携带 `a_session_generation`。 + +Initiator 在收到 msg2 后: + +- 若 msg2 为 `JOIN`:继续使用本地 `root_key` 与 `session_generation`(不重置 epoch/seq)。 +- 若 msg2 为 `SYNC` / `CREATE` 且携带 `root_key`:用 msg2 携带的 `root_key` 覆盖本地会话,并将数据面计数器重置为 `initial_epoch`、`seq=0`,重放窗口清空。 + +### 7.4 握手 payload 编码:protobuf vs 固定布局 + +推荐使用 protobuf(pb)来编码 Noise 握手消息的 payload,原因: + +- 易演进(字段可选、可扩展、兼容旧版本) +- 项目内已广泛使用 pb(例如 `HandshakeRequest`) +- 开销可控:除去字符串外,核心字段均为固定长度 bytes(16/32/12),pb 仅增加少量 tag/len varint + +可选方案:固定布局。若你追求极致性能/可预测大小,可将同等字段按固定布局编码。本文以下默认以 protobuf 形式定义字段;固定布局可按同样字段直接平铺实现。 + +### 7.5 握手消息(Noise_XX 的 3 条消息) + +记: + +- msg1: A -> B(payload 明文) +- msg2: B -> A(payload 加密) +- msg3: A -> B(payload 加密) + +#### 7.5.1 pb 定义(字段类型与语义大小) + +下述为“协议级定义”(概念 proto),不要求立刻落入代码生成;实现可在 proto 中新增 message,或先在 Rust 侧用 prost 定义本地 message。 + +```proto +message PeerConnNoiseMsg1Pb { + uint32 version = 1; // varint + string a_network_name = 2; // len <= 64 bytes (建议约束) + optional uint32 a_session_generation = 3; // varint,可选 + bytes a_conn_id = 4; // 16B (UUID) +} + +enum PeerConnSessionActionPb { + JOIN = 0; // 不发送 root_key,表示“继续使用既有会话” + SYNC = 1; // 发送 root_key,用于对端同步到本地会话 + CREATE = 2; // 发送 root_key,表示本地新建会话 +} + +message PeerConnNoiseMsg2Pb { + string b_network_name = 1; // len <= 64 bytes + uint32 role_hint = 2; // 1=同网提示, 2=共享节点/外网提示 + + PeerConnSessionActionPb action = 3; // JOIN/SYNC/CREATE + uint32 b_session_generation = 4; // varint + + optional bytes root_key_32 = 5; // 32B,当 action=SYNC/CREATE 时必须存在 + uint32 initial_epoch = 6; // u32(编码为 varint 或 fixed32 均可),建议语义为 BE u32 值 + + bytes b_conn_id = 7; // 16B (UUID) + bytes a_conn_id_echo = 8; // 16B (UUID) +} + +message PeerConnNoiseMsg3Pb { + bytes a_conn_id_echo = 1; // 16B + bytes b_conn_id_echo = 2; // 16B + + // 可选:network_secret_confirmed 的 proof + optional bytes secret_proof_32 = 3; // 32B +} +``` + +字段语义大小(不含 pb tag/len): + +- UUID:16B +- root_key:32B +- secret_proof:32B +- initial_epoch:4B(逻辑大小;pb 编码本身为 varint/fixed32,wire 大小可变或 4B) + +#### 7.5.2 msg1 payload(A -> B,明文) + +```text +payload_bytes = PeerConnNoiseMsg1Pb.encode_to_vec() +``` + +说明: + +- 该 payload 为明文,因此不放 `root_key` 等敏感材料。 +- `a_network_name` 用于角色提示。 +- `a_session_generation` 用于 Responder 做 join/sync/create 决策。 +- `a_conn_id` 用于本次连接绑定(防拼接),将在 msg2/msg3 回显。 + +#### 7.5.3 msg2 payload(B -> A,加密) + +```text +payload_bytes = PeerConnNoiseMsg2Pb.encode_to_vec() +``` + +说明: + +- `action` 决定本次握手是否会更新会话根密钥: + - `JOIN`:不发送 `root_key_32`,表示“继续使用既有会话” + - `SYNC`:发送 `root_key_32`,用于对端同步到本地既有会话 + - `CREATE`:发送 `root_key_32`,表示本地创建新会话 +- `initial_epoch` 默认 0;若需要随机化,可设置为随机 u32,但需要接收端 key/窗口缓存支持更复杂的淘汰策略。 +- `a_conn_id_echo` 与 `b_conn_id` 用于连接绑定;msg3 将回显两者以确认双方看到同一组值。 + +#### 7.5.4 msg3 payload(A -> B,加密) + +```text +payload_bytes = PeerConnNoiseMsg3Pb.encode_to_vec() +``` + +`secret_proof_32`(可选)用于 `network_secret_confirmed`: + +```text +secret_proof = HMAC-SHA256( + key = derive(network_secret), + data = role_byte || handshake_hash +) +``` + +其中 `handshake_hash` 由 Noise 提供,`role_byte` 用于区分双方角色(client/server)。 + +### 7.6 pinning(共享节点) + +- 配置位置:`PeerConfig.peer_public_key`(base64,32B)。 +- 校验时机:Noise 握手结束后,A 读取 `remote_static_pubkey`,若配置了 pinned 则必须匹配,否则断连。 + +--- + +## 8. 角色判断与安全语义 + +- `network_name` 的比较足以用于 **角色提示**: + - `a_network_name == b_network_name`:同网提示 + - 否则:共享节点/外网提示 +- 但 `network_name` **不是认证锚点**。安全决策仅应基于: + - 共享节点 pinning 成功(`shared_node_pubkey_verified`) + - 或 network_secret_confirmed 成功(`network_secret_confirmed`) +- 在未完成上述任一认证前,连接为 `encrypted_unauthenticated`:仅保证机密性/完整性,不保证对端身份,存在 MITM 风险。 + +--- + +## 9. 与包尾 nonce 加密格式的关系 + +项目当前的 ring chacha20 加密实现使用随机 nonce 并将 nonce 明文附在包尾: +[ring_chacha20.rs](file:///data/project/EasyTier/easytier/src/peers/encrypt/ring_chacha20.rs#L69-L93) + +本文将随机 nonce 替换为结构化 `epoch||seq`: + +- 仍为 12B +- 仍明文放包尾 +- 但语义从“随机唯一”变为“可乱序解密 + 可 anti-replay + 可轮换” + +--- + +## 10. 默认参数汇总 + +- nonce:12B = epoch(u32 BE) + seq(u64 BE) +- tag:16B +- key:32B(AES-256-GCM 或 ChaCha20-Poly1305) +- replay window:256(bitmap 32B) +- keep_epochs:2(current + previous) +- evict_idle_after:30s diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index e4338236..eeb0c21a 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -232,6 +232,15 @@ core_clap: stun_servers_v6: en: "Override default STUN servers, IPv6; If configured but empty, IPv6 STUN servers are not used" zh-CN: "覆盖内置的默认 IPv6 STUN server 列表;如果设置了但是为空,则不使用 IPv6 STUN servers;如果没设置,则使用默认 IPv6 STUN server 列表" + secure_mode: + en: "if true, enable secure mode. default is false" + zh-CN: "如果为true,则启用安全模式。默认值为false" + local_private_key: + en: "local private key for secure mode. if not provided, a random key will be generated" + zh-CN: "安全模式下的本地私钥。如果未提供,则会随机生成一个密钥" + local_public_key: + en: "local public key for secure mode. if not provided, a random key will be generated, or use local private key to derive public key" + zh-CN: "安全模式下的本地公钥。如果未提供,则会随机生成一个密钥,或者使用本地私钥派生公钥" check_config: en: Check config validity without starting the network zh-CN: 检查配置文件的有效性并退出 diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 772bb75f..c0f86105 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -14,7 +14,7 @@ use crate::{ instance::dns_server::DEFAULT_ET_DNS_ZONE, proto::{ acl::Acl, - common::{CompressionAlgoPb, PortForwardConfigPb, SocketType}, + common::{CompressionAlgoPb, PortForwardConfigPb, SecureModeConfig, SocketType}, }, tunnel::generate_digest_from_str, }; @@ -209,6 +209,9 @@ pub trait ConfigLoader: Send + Sync { fn get_stun_servers_v6(&self) -> Option>; fn set_stun_servers_v6(&self, servers: Option>); + fn get_secure_mode(&self) -> Option; + fn set_secure_mode(&self, secure_mode: Option); + fn dump(&self) -> String; } @@ -300,6 +303,7 @@ impl Default for NetworkIdentity { #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct PeerConfig { pub uri: url::Url, + pub peer_public_key: Option, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -407,6 +411,8 @@ struct Config { port_forward: Option>, + secure_mode: Option, + flags: Option>, #[serde(skip)] @@ -802,6 +808,14 @@ impl ConfigLoader for TomlConfigLoader { self.config.lock().unwrap().stun_servers_v6 = servers; } + fn get_secure_mode(&self) -> Option { + self.config.lock().unwrap().secure_mode.clone() + } + + fn set_secure_mode(&self, secure_mode: Option) { + self.config.lock().unwrap().secure_mode = secure_mode; + } + fn dump(&self) -> String { let default_flags_json = serde_json::to_string(&gen_default_flags()).unwrap(); let default_flags_hashmap = diff --git a/easytier/src/common/constants.rs b/easytier/src/common/constants.rs index 3544b139..9bd62c09 100644 --- a/easytier/src/common/constants.rs +++ b/easytier/src/common/constants.rs @@ -29,6 +29,9 @@ define_global_var!(MAX_DIRECT_CONNS_PER_PEER_IN_FOREIGN_NETWORK, u32, 3); define_global_var!(DIRECT_CONNECT_TO_PUBLIC_SERVER, bool, true); +// must make it true in future. +define_global_var!(HMAC_SECRET_DIGEST, bool, false); + pub const UDP_HOLE_PUNCH_CONNECTOR_SERVICE_ID: u32 = 2; pub const WIN_SERVICE_WORK_DIR_REG_KEY: &str = "SOFTWARE\\EasyTier\\Service\\WorkDir"; diff --git a/easytier/src/common/error.rs b/easytier/src/common/error.rs index 045d363f..87ef7ff4 100644 --- a/easytier/src/common/error.rs +++ b/easytier/src/common/error.rs @@ -48,6 +48,9 @@ pub enum Error { #[error("secret key error: {0}")] SecretKeyError(String), + + #[error("noise protocol error: {0}")] + NoiseError(#[from] snow::Error), } pub type Result = result::Result; diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 840b0c84..3294813a 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -15,6 +15,8 @@ use crate::proto::api::instance::PeerConnInfo; use crate::proto::common::{PeerFeatureFlag, PortForwardConfigPb}; use crate::proto::peer_rpc::PeerGroupInfo; use crossbeam::atomic::AtomicCell; +use hmac::{Hmac, Mac}; +use sha2::Sha256; use super::{ config::{ConfigLoader, Flags}, @@ -268,6 +270,15 @@ impl GlobalCtx { self.config.get_network_identity() } + pub fn get_secret_proof(&self, challenge: &[u8]) -> Option> { + let network_secret = self.get_network_identity().network_secret?; + let key = network_secret.as_bytes(); + let mut mac = Hmac::::new_from_slice(key).unwrap(); + mac.update(b"easytier secret proof"); + mac.update(challenge); + Some(mac) + } + pub fn get_network_name(&self) -> String { self.get_network_identity().network_name } diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 3bcdce36..a5d543ef 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -186,7 +186,9 @@ impl DirectConnectorManagerData { .await?; // NOTICE: must add as directly connected tunnel - self.peer_manager.add_client_tunnel(ret, true).await + self.peer_manager + .add_client_tunnel_with_peer_id_hint(ret, true, Some(dst_peer_id)) + .await } async fn do_try_connect_to_ip(&self, dst_peer_id: PeerId, addr: String) -> Result<(), Error> { @@ -199,7 +201,8 @@ impl DirectConnectorManagerData { } else { timeout( std::time::Duration::from_secs(3), - self.peer_manager.try_direct_connect(connector), + self.peer_manager + .try_direct_connect_with_peer_id_hint(connector, Some(dst_peer_id)), ) .await?? }; diff --git a/easytier/src/core.rs b/easytier/src/core.rs index 350b9505..b91ed29f 100644 --- a/easytier/src/core.rs +++ b/easytier/src/core.rs @@ -19,15 +19,17 @@ use crate::{ defer, instance_manager::NetworkInstanceManager, launcher::add_proxy_network_to_config, - proto::common::CompressionAlgoPb, + proto::common::{CompressionAlgoPb, SecureModeConfig}, rpc_service::ApiRpcServer, tunnel::PROTO_PORT_OFFSET, utils::{init_logger, setup_panic_handler}, web_client, ShellType, }; use anyhow::Context; +use base64::{prelude::BASE64_STANDARD, Engine as _}; use cidr::IpCidr; use clap::{CommandFactory, Parser}; +use rand::rngs::OsRng; use rust_i18n::t; use tokio::io::AsyncReadExt; @@ -600,6 +602,29 @@ struct NetworkOptions { num_args = 0.. )] stun_servers_v6: Option>, + + #[arg( + long, + env = "ET_SECURE_MODE", + help = t!("core_clap.secure_mode").to_string(), + num_args = 0..=1, + default_missing_value = "true" + )] + secure_mode: Option, + + #[arg( + long, + env = "ET_LOCAL_PRIVATE_KEY", + help = t!("core_clap.local_private_key").to_string() + )] + local_private_key: Option, + + #[arg( + long, + env = "ET_LOCAL_PUBLIC_KEY", + help = t!("core_clap.local_public_key").to_string() + )] + local_public_key: Option, } #[derive(Parser, Debug)] @@ -723,6 +748,42 @@ impl NetworkOptions { false } + fn process_secure_mode_cfg(mut user_cfg: SecureModeConfig) -> anyhow::Result { + if !user_cfg.enabled { + return Ok(user_cfg); + } + + let private_key = if user_cfg.local_private_key.is_none() { + // if no private key, generate random one + let private = x25519_dalek::StaticSecret::random_from_rng(OsRng); + user_cfg.local_private_key = Some(BASE64_STANDARD.encode(private.clone().as_bytes())); + private + } else { + // check if private key is valid + user_cfg.private_key()? + }; + + let public = x25519_dalek::PublicKey::from(&private_key); + + match user_cfg.local_public_key { + None => { + user_cfg.local_public_key = Some(BASE64_STANDARD.encode(public.as_bytes())); + } + Some(ref user_pub) => { + let public = user_cfg.public_key()?; + if *user_pub != BASE64_STANDARD.encode(public.as_bytes()) { + return Err(anyhow::anyhow!( + "local public key {} does not match generated public key {}", + user_pub, + BASE64_STANDARD.encode(public.as_bytes()) + )); + } + } + } + + Ok(user_cfg) + } + fn merge_into(&self, cfg: &TomlConfigLoader) -> anyhow::Result<()> { if self.hostname.is_some() { cfg.set_hostname(self.hostname.clone()); @@ -760,6 +821,7 @@ impl NetworkOptions { uri: p .parse() .with_context(|| format!("failed to parse peer uri: {}", p))?, + peer_public_key: None, }); } cfg.set_peers(peers); @@ -820,6 +882,7 @@ impl NetworkOptions { uri: external_nodes.parse().with_context(|| { format!("failed to parse external node uri: {}", external_nodes) })?, + peer_public_key: None, }); cfg.set_peers(old_peers); } @@ -902,6 +965,17 @@ impl NetworkOptions { cfg.set_port_forwards(old); } + if let Some(secure_mode) = self.secure_mode { + if secure_mode { + let c = SecureModeConfig { + enabled: secure_mode, + local_private_key: self.local_private_key.clone(), + local_public_key: self.local_public_key.clone(), + }; + cfg.set_secure_mode(Some(Self::process_secure_mode_cfg(c)?)); + } + } + let mut f = cfg.get_flags(); if let Some(default_protocol) = &self.default_protocol { f.default_protocol = default_protocol.clone() diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 0e7ce218..4e6df74f 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -536,6 +536,7 @@ impl NetworkConfig { uri: public_server_url.parse().with_context(|| { format!("failed to parse public server uri: {}", public_server_url) })?, + peer_public_key: None, }]); } NetworkingMethod::Manual => { @@ -548,6 +549,7 @@ impl NetworkConfig { uri: peer_url .parse() .with_context(|| format!("failed to parse peer uri: {}", peer_url))?, + peer_public_key: None, }); } @@ -673,6 +675,8 @@ impl NetworkConfig { )); } + cfg.set_secure_mode(self.secure_mode.clone()); + let mut flags = gen_default_flags(); if let Some(latency_first) = self.latency_first { flags.latency_first = latency_first; @@ -897,6 +901,8 @@ impl NetworkConfig { result.mapped_listeners = mapped_listeners.iter().map(|l| l.to_string()).collect(); } + result.secure_mode = config.get_secure_mode(); + let flags = config.get_flags(); result.latency_first = Some(flags.latency_first); result.dev_name = Some(flags.dev_name.clone()); @@ -944,7 +950,7 @@ impl NetworkConfig { #[cfg(test)] mod tests { - use crate::common::config::ConfigLoader; + use crate::{common::config::ConfigLoader, proto::common::SecureModeConfig}; use rand::Rng; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -1018,7 +1024,10 @@ mod tests { let uri = format!("{}://127.0.0.1:{}", protocol, port) .parse() .unwrap(); - peers.push(crate::common::config::PeerConfig { uri }); + peers.push(crate::common::config::PeerConfig { + uri, + peer_public_key: None, + }); } config.set_peers(peers); @@ -1140,6 +1149,14 @@ mod tests { config.set_mapped_listeners(Some(mapped_listeners)); } + if rng.gen_bool(0.3) { + config.set_secure_mode(Some(SecureModeConfig { + enabled: true, + local_private_key: None, + local_public_key: None, + })); + } + if rng.gen_bool(0.9) { let mut flags = crate::common::config::gen_default_flags(); flags.latency_first = rng.gen_bool(0.5); diff --git a/easytier/src/peers/encrypt/aes_gcm.rs b/easytier/src/peers/encrypt/aes_gcm.rs index d39ed062..2201456b 100644 --- a/easytier/src/peers/encrypt/aes_gcm.rs +++ b/easytier/src/peers/encrypt/aes_gcm.rs @@ -84,6 +84,14 @@ impl Encryptor for AesGcmCipher { } fn encrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error> { + self.encrypt_with_nonce(zc_packet, None) + } + + fn encrypt_with_nonce( + &self, + zc_packet: &mut ZCPacket, + nonce: Option<&[u8]>, + ) -> Result<(), Error> { let pm_header = zc_packet.peer_manager_header().unwrap(); if pm_header.is_encrypted() { tracing::warn!(?zc_packet, "packet is already encrypted"); @@ -91,16 +99,28 @@ impl Encryptor for AesGcmCipher { } let mut tail = AesGcmTail::default(); + if let Some(nonce) = nonce { + if nonce.len() != tail.nonce.len() { + return Err(Error::EncryptionFailed); + } + tail.nonce.copy_from_slice(nonce); + } let rs = match &self.cipher { AesGcmEnum::AES128GCM(aes_gcm) => { - let nonce = Aes128Gcm::generate_nonce(&mut OsRng); - tail.nonce.copy_from_slice(nonce.as_slice()); - aes_gcm.encrypt_in_place_detached(&nonce, &[], zc_packet.mut_payload()) + if nonce.is_none() { + let nonce = Aes128Gcm::generate_nonce(&mut OsRng); + tail.nonce.copy_from_slice(nonce.as_slice()); + } + let nonce = Nonce::from_slice(&tail.nonce); + aes_gcm.encrypt_in_place_detached(nonce, &[], zc_packet.mut_payload()) } AesGcmEnum::AES256GCM(aes_gcm) => { - let nonce = Aes256Gcm::generate_nonce(&mut OsRng); - tail.nonce.copy_from_slice(nonce.as_slice()); - aes_gcm.encrypt_in_place_detached(&nonce, &[], zc_packet.mut_payload()) + if nonce.is_none() { + let nonce = Aes256Gcm::generate_nonce(&mut OsRng); + tail.nonce.copy_from_slice(nonce.as_slice()); + } + let nonce = Nonce::from_slice(&tail.nonce); + aes_gcm.encrypt_in_place_detached(nonce, &[], zc_packet.mut_payload()) } }; @@ -122,8 +142,9 @@ impl Encryptor for AesGcmCipher { mod tests { use crate::{ peers::encrypt::{aes_gcm::AesGcmCipher, Encryptor}, - tunnel::packet_def::{ZCPacket, AES_GCM_ENCRYPTION_RESERVED}, + tunnel::packet_def::{AesGcmTail, ZCPacket, AES_GCM_ENCRYPTION_RESERVED}, }; + use zerocopy::FromBytes; #[test] fn test_aes_gcm_cipher() { @@ -143,4 +164,32 @@ mod tests { assert_eq!(packet.payload(), text); assert!(!packet.peer_manager_header().unwrap().is_encrypted()); } + + #[test] + fn test_aes_gcm_cipher_with_nonce() { + let key = [7u8; 16]; + let cipher = AesGcmCipher::new_128(key); + let text = b"Hello"; + let nonce = [3u8; 12]; + + let mut packet1 = ZCPacket::new_with_payload(text); + packet1.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet1, Some(&nonce)) + .unwrap(); + + let mut packet2 = ZCPacket::new_with_payload(text); + packet2.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet2, Some(&nonce)) + .unwrap(); + + assert_eq!(packet1.payload(), packet2.payload()); + + let tail = AesGcmTail::ref_from_suffix(packet1.payload()).unwrap(); + assert_eq!(tail.nonce, nonce); + + cipher.decrypt(&mut packet1).unwrap(); + assert_eq!(packet1.payload(), text); + } } diff --git a/easytier/src/peers/encrypt/mod.rs b/easytier/src/peers/encrypt/mod.rs index 76a5c2a4..76d42b27 100644 --- a/easytier/src/peers/encrypt/mod.rs +++ b/easytier/src/peers/encrypt/mod.rs @@ -30,6 +30,13 @@ pub enum Error { pub trait Encryptor: Send + Sync + 'static { fn encrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error>; + fn encrypt_with_nonce( + &self, + zc_packet: &mut ZCPacket, + _nonce: Option<&[u8]>, + ) -> Result<(), Error> { + self.encrypt(zc_packet) + } fn decrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error>; } diff --git a/easytier/src/peers/encrypt/openssl_cipher.rs b/easytier/src/peers/encrypt/openssl_cipher.rs index 9d6db816..fc7f70d9 100644 --- a/easytier/src/peers/encrypt/openssl_cipher.rs +++ b/easytier/src/peers/encrypt/openssl_cipher.rs @@ -142,6 +142,14 @@ impl Encryptor for OpenSslCipher { } fn encrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error> { + self.encrypt_with_nonce(zc_packet, None) + } + + fn encrypt_with_nonce( + &self, + zc_packet: &mut ZCPacket, + nonce: Option<&[u8]>, + ) -> Result<(), Error> { let pm_header = zc_packet.peer_manager_header().unwrap(); if pm_header.is_encrypted() { tracing::warn!(?zc_packet, "packet is already encrypted"); @@ -153,7 +161,14 @@ impl Encryptor for OpenSslCipher { let nonce_size = self.get_nonce_size(); let mut tail = OpenSslTail::default(); - rand::thread_rng().fill_bytes(&mut tail.nonce[..nonce_size]); + if let Some(nonce) = nonce { + if nonce.len() != nonce_size { + return Err(Error::EncryptionFailed); + } + tail.nonce[..nonce_size].copy_from_slice(nonce); + } else { + rand::thread_rng().fill_bytes(&mut tail.nonce[..nonce_size]); + } let mut encrypter = Crypter::new(cipher, Mode::Encrypt, key, Some(&tail.nonce[..nonce_size])) @@ -198,6 +213,7 @@ mod tests { peers::encrypt::{openssl_cipher::OpenSslCipher, Encryptor}, tunnel::packet_def::ZCPacket, }; + use zerocopy::FromBytes; use super::OPENSSL_ENCRYPTION_RESERVED; @@ -220,6 +236,37 @@ mod tests { assert!(!packet.peer_manager_header().unwrap().is_encrypted()); } + #[test] + fn test_openssl_aes128_gcm_with_nonce() { + let key = [7u8; 16]; + let cipher = OpenSslCipher::new_aes128_gcm(key); + let text = b"Hello"; + let nonce = [3u8; 12]; + + let mut packet1 = ZCPacket::new_with_payload(text); + packet1.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet1, Some(&nonce)) + .unwrap(); + + let mut packet2 = ZCPacket::new_with_payload(text); + packet2.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet2, Some(&nonce)) + .unwrap(); + + assert_eq!(packet1.payload(), packet2.payload()); + assert!(packet1.payload().len() > text.len() + OPENSSL_ENCRYPTION_RESERVED); + + let tail = super::OpenSslTail::ref_from_suffix(packet1.payload()) + .unwrap() + .clone(); + assert_eq!(&tail.nonce[..nonce.len()], nonce); + + cipher.decrypt(&mut packet1).unwrap(); + assert_eq!(packet1.payload(), text); + } + #[test] fn test_openssl_chacha20() { let key = [0u8; 32]; diff --git a/easytier/src/peers/encrypt/ring_aes_gcm.rs b/easytier/src/peers/encrypt/ring_aes_gcm.rs index c0c77624..74a961bd 100644 --- a/easytier/src/peers/encrypt/ring_aes_gcm.rs +++ b/easytier/src/peers/encrypt/ring_aes_gcm.rs @@ -93,6 +93,14 @@ impl Encryptor for AesGcmCipher { } fn encrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error> { + self.encrypt_with_nonce(zc_packet, None) + } + + fn encrypt_with_nonce( + &self, + zc_packet: &mut ZCPacket, + nonce: Option<&[u8]>, + ) -> Result<(), Error> { let pm_header = zc_packet.peer_manager_header().unwrap(); if pm_header.is_encrypted() { tracing::warn!(?zc_packet, "packet is already encrypted"); @@ -100,7 +108,14 @@ impl Encryptor for AesGcmCipher { } let mut tail = AesGcmTail::default(); - rand::thread_rng().fill_bytes(&mut tail.nonce); + if let Some(nonce) = nonce { + if nonce.len() != tail.nonce.len() { + return Err(Error::EncryptionFailed); + } + tail.nonce.copy_from_slice(nonce); + } else { + rand::thread_rng().fill_bytes(&mut tail.nonce); + } let nonce = aead::Nonce::assume_unique_for_key(tail.nonce); let rs = match &self.cipher { @@ -137,8 +152,9 @@ impl Encryptor for AesGcmCipher { mod tests { use crate::{ peers::encrypt::{ring_aes_gcm::AesGcmCipher, Encryptor}, - tunnel::packet_def::{ZCPacket, AES_GCM_ENCRYPTION_RESERVED}, + tunnel::packet_def::{AesGcmTail, ZCPacket, AES_GCM_ENCRYPTION_RESERVED}, }; + use zerocopy::FromBytes; #[test] fn test_aes_gcm_cipher() { @@ -158,4 +174,32 @@ mod tests { assert_eq!(packet.payload(), text); assert!(!packet.peer_manager_header().unwrap().is_encrypted()); } + + #[test] + fn test_aes_gcm_cipher_with_nonce() { + let key = [7u8; 16]; + let cipher = AesGcmCipher::new_128(key); + let text = b"Hello"; + let nonce = [3u8; 12]; + + let mut packet1 = ZCPacket::new_with_payload(text); + packet1.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet1, Some(&nonce)) + .unwrap(); + + let mut packet2 = ZCPacket::new_with_payload(text); + packet2.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet2, Some(&nonce)) + .unwrap(); + + assert_eq!(packet1.payload(), packet2.payload()); + + let tail = AesGcmTail::ref_from_suffix(packet1.payload()).unwrap(); + assert_eq!(tail.nonce, nonce); + + cipher.decrypt(&mut packet1).unwrap(); + assert_eq!(packet1.payload(), text); + } } diff --git a/easytier/src/peers/encrypt/ring_chacha20.rs b/easytier/src/peers/encrypt/ring_chacha20.rs index 687d02f9..8acc7fd9 100644 --- a/easytier/src/peers/encrypt/ring_chacha20.rs +++ b/easytier/src/peers/encrypt/ring_chacha20.rs @@ -67,6 +67,14 @@ impl Encryptor for RingChaCha20Cipher { } fn encrypt(&self, zc_packet: &mut ZCPacket) -> Result<(), Error> { + self.encrypt_with_nonce(zc_packet, None) + } + + fn encrypt_with_nonce( + &self, + zc_packet: &mut ZCPacket, + nonce: Option<&[u8]>, + ) -> Result<(), Error> { let pm_header = zc_packet.peer_manager_header().unwrap(); if pm_header.is_encrypted() { tracing::warn!(?zc_packet, "packet is already encrypted"); @@ -74,7 +82,14 @@ impl Encryptor for RingChaCha20Cipher { } let mut tail = ChaCha20Poly1305Tail::default(); - rand::thread_rng().fill_bytes(&mut tail.nonce); + if let Some(nonce) = nonce { + if nonce.len() != tail.nonce.len() { + return Err(Error::EncryptionFailed); + } + tail.nonce.copy_from_slice(nonce); + } else { + rand::thread_rng().fill_bytes(&mut tail.nonce); + } let nonce = Nonce::assume_unique_for_key(tail.nonce); let rs = @@ -100,6 +115,7 @@ mod tests { peers::encrypt::{ring_chacha20::RingChaCha20Cipher, Encryptor}, tunnel::packet_def::ZCPacket, }; + use zerocopy::FromBytes; use super::CHACHA20_POLY1305_ENCRYPTION_RESERVED; @@ -122,4 +138,33 @@ mod tests { assert_eq!(packet.payload(), text); assert!(!packet.peer_manager_header().unwrap().is_encrypted()); } + + #[test] + fn test_ring_chacha20_cipher_with_nonce() { + let key = [9u8; 32]; + let cipher = RingChaCha20Cipher::new(key); + let text = b"Hello"; + let nonce = [5u8; 12]; + + let mut packet1 = ZCPacket::new_with_payload(text); + packet1.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet1, Some(&nonce)) + .unwrap(); + + let mut packet2 = ZCPacket::new_with_payload(text); + packet2.fill_peer_manager_hdr(0, 0, 0); + cipher + .encrypt_with_nonce(&mut packet2, Some(&nonce)) + .unwrap(); + + assert_eq!(packet1.payload(), packet2.payload()); + + let tail = super::ChaCha20Poly1305Tail::ref_from_suffix(packet1.payload()).unwrap(); + assert_eq!(tail.nonce, nonce); + + cipher.decrypt(&mut packet1).unwrap(); + assert_eq!(packet1.payload(), text); + assert!(!packet1.peer_manager_header().unwrap().is_encrypted()); + } } diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index d9893dcb..9b3b0ee0 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -2,7 +2,6 @@ mod graph_algo; pub mod acl_filter; pub mod peer; -// pub mod peer_conn; pub mod peer_conn; pub mod peer_conn_ping; pub mod peer_manager; @@ -10,6 +9,7 @@ pub mod peer_map; pub mod peer_ospf_route; pub mod peer_rpc; pub mod peer_rpc_service; +pub mod peer_session; pub mod route_trait; pub mod rpc_service; diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 6733d2d5..3e982d7e 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -238,12 +238,12 @@ impl Drop for Peer { #[cfg(test)] mod tests { - + use std::sync::Arc; use tokio::time::timeout; use crate::{ common::{global_ctx::tests::get_mock_global_ctx, new_peer_id}, - peers::{create_packet_recv_chan, peer_conn::PeerConn}, + peers::{create_packet_recv_chan, peer_conn::PeerConn, peer_session::PeerSessionStore}, tunnel::ring::create_ring_tunnel_pair, }; @@ -257,11 +257,20 @@ mod tests { let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone()); let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone()); + let ps = Arc::new(PeerSessionStore::new()); let (local_tunnel, remote_tunnel) = create_ring_tunnel_pair(); - let mut local_peer_conn = - PeerConn::new(local_peer.peer_node_id, global_ctx.clone(), local_tunnel); - let mut remote_peer_conn = - PeerConn::new(remote_peer.peer_node_id, global_ctx.clone(), remote_tunnel); + let mut local_peer_conn = PeerConn::new( + local_peer.peer_node_id, + global_ctx.clone(), + local_tunnel, + ps.clone(), + ); + let mut remote_peer_conn = PeerConn::new( + remote_peer.peer_node_id, + global_ctx.clone(), + remote_tunnel, + ps.clone(), + ); assert!(!local_peer_conn.handshake_done()); assert!(!remote_peer_conn.handshake_done()); diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index c6f2c002..4bac1bd3 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -9,8 +9,12 @@ use std::{ }; use arc_swap::ArcSwapOption; +use crossbeam::atomic::AtomicCell; use futures::{StreamExt, TryFutureExt}; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use base64::Engine as _; +use hmac::Mac; use prost::Message; use tokio::{ @@ -22,6 +26,8 @@ use tokio::{ use tracing::Instrument; use zerocopy::AsBytes; +use snow::{params::NoiseParams, HandshakeState}; + use crate::{ common::{ config::{NetworkIdentity, NetworkSecretDigest}, @@ -31,27 +37,183 @@ use crate::{ stats_manager::{CounterHandle, LabelSet, LabelType, MetricName}, PeerId, }, + peers::peer_session::{PeerSessionStore, SessionKey, UpsertResponderSessionReturn}, proto::{ api::instance::{PeerConnInfo, PeerConnStats}, - common::TunnelInfo, - peer_rpc::HandshakeRequest, + common::{SecureModeConfig, TunnelInfo}, + peer_rpc::{ + HandshakeRequest, PeerConnNoiseMsg1Pb, PeerConnNoiseMsg2Pb, PeerConnNoiseMsg3Pb, + PeerConnSessionActionPb, SecureAuthLevel, + }, }, tunnel::{ - filter::{StatsRecorderTunnelFilter, TunnelFilter, TunnelWithFilter}, + filter::{StatsRecorderTunnelFilter, TunnelFilter, TunnelFilterChain, TunnelWithFilter}, mpsc::{MpscTunnel, MpscTunnelSender}, packet_def::{PacketType, ZCPacket}, stats::{Throughput, WindowLatency}, Tunnel, TunnelError, ZCPacketStream, }, + use_global_var, }; -use super::{peer_conn_ping::PeerConnPinger, PacketRecvChan}; +use super::{ + peer_conn_ping::PeerConnPinger, + peer_session::{PeerSession, PeerSessionAction}, + PacketRecvChan, +}; pub type PeerConnId = uuid::Uuid; const MAGIC: u32 = 0xd1e1a5e1; const VERSION: u32 = 1; +/// The proof of client secret. +#[derive(Debug)] +struct SecretProof { + challenge: Vec, + proof: Vec, +} + +/// The result of noise handshake. +#[derive(Debug)] +struct NoiseHandshakeResult { + peer_id: PeerId, + session: Arc, + local_static_pubkey: Vec, + remote_static_pubkey: Vec, + handshake_hash: Vec, + secure_auth_level: SecureAuthLevel, + remote_network_name: String, + + secret_digest: Vec, + + // foreign network manager use this to verify peer. + // the challenge will be sent to authorized peer and compare the proof against it. + client_secret_proof: Option, + + my_encrypt_algo: String, + remote_encrypt_algo: String, +} + +#[derive(Clone)] +struct PeerSessionTunnelFilter { + enabled: bool, + my_peer_id: Arc>, + peer_id: Arc>>, + session: Arc>>>, +} + +impl PeerSessionTunnelFilter { + fn new(enabled: bool) -> Self { + Self { + enabled, + my_peer_id: Arc::new(AtomicCell::new(PeerId::default())), + peer_id: Arc::new(AtomicCell::new(None)), + session: Arc::new(std::sync::Mutex::new(None)), + } + } + + fn new_with_peer(my_peer_id: PeerId, enabled: bool) -> Self { + Self { + enabled, + my_peer_id: Arc::new(AtomicCell::new(my_peer_id)), + peer_id: Arc::new(AtomicCell::new(None)), + session: Arc::new(std::sync::Mutex::new(None)), + } + } + + fn set_my_peer_id(&self, my_peer_id: PeerId) { + self.my_peer_id.store(my_peer_id); + } + + fn set_peer_id(&self, peer_id: PeerId) { + self.peer_id.store(Some(peer_id)); + } + + fn set_session(&self, session: Arc) { + *self.session.lock().unwrap() = Some(session); + } + + fn should_skip_encrypt(&self, hdr: &crate::tunnel::packet_def::PeerManagerHeader) -> bool { + hdr.packet_type == PacketType::NoiseHandshakeMsg1 as u8 + || hdr.packet_type == PacketType::NoiseHandshakeMsg2 as u8 + || hdr.packet_type == PacketType::NoiseHandshakeMsg3 as u8 + || hdr.packet_type == PacketType::Ping as u8 + || hdr.packet_type == PacketType::Pong as u8 + } +} + +impl TunnelFilter for PeerSessionTunnelFilter { + type FilterOutput = (); + + fn before_send(&self, mut data: crate::tunnel::SinkItem) -> Option { + if !self.enabled { + return Some(data); + } + + let Some(hdr) = data.peer_manager_header() else { + return Some(data); + }; + + if self.should_skip_encrypt(hdr) { + return Some(data); + } + + let Some(peer_id) = self.peer_id.load() else { + return Some(data); + }; + + let mut guard = self.session.lock().unwrap(); + let Some(session) = guard.as_mut() else { + return Some(data); + }; + + let my_peer_id = self.my_peer_id.load(); + session + .encrypt_payload(my_peer_id, peer_id, &mut data) + .ok()?; + + Some(data) + } + + fn after_received(&self, data: crate::tunnel::StreamItem) -> Option { + if !self.enabled { + return Some(data); + } + + let mut data = match data { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + + let Some(hdr) = data.peer_manager_header() else { + return Some(Ok(data)); + }; + + if self.should_skip_encrypt(hdr) { + return Some(Ok(data)); + } + + let from_peer_id = hdr.from_peer_id.get(); + if from_peer_id == 0 { + return Some(Ok(data)); + } + self.peer_id.store(Some(from_peer_id)); + + let mut guard = self.session.lock().unwrap(); + let Some(session) = guard.as_mut() else { + return Some(Ok(data)); + }; + + let my_peer_id = self.my_peer_id.load(); + let _ = session.decrypt_payload(from_peer_id, my_peer_id, &mut data); + + Some(Ok(data)) + } + + fn filter_output(&self) {} +} + pub struct PeerConnCloseNotify { conn_id: PeerConnId, sender: Arc>>>, @@ -98,8 +260,13 @@ pub struct PeerConn { conn_id: PeerConnId, my_peer_id: PeerId, + peer_id_hint: Option, global_ctx: ArcGlobalCtx, + secure_mode_cfg: Option, + session_filter: PeerSessionTunnelFilter, + noise_handshake_result: Option, + tunnel: Arc>>, sink: MpscTunnelSender, recv: Mutex>>>, @@ -122,6 +289,9 @@ pub struct PeerConn { loss_rate_stats: Arc, counters: ArcSwapOption, + + peer_session_store: Arc, + my_encrypt_algo: String, } impl Debug for PeerConn { @@ -135,25 +305,57 @@ impl Debug for PeerConn { } impl PeerConn { - pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box) -> Self { + pub fn new( + my_peer_id: PeerId, + global_ctx: ArcGlobalCtx, + tunnel: Box, + peer_session_store: Arc, + ) -> Self { + Self::new_with_peer_id_hint(my_peer_id, global_ctx, tunnel, None, peer_session_store) + } + + pub fn new_with_peer_id_hint( + my_peer_id: PeerId, + global_ctx: ArcGlobalCtx, + tunnel: Box, + peer_id_hint: Option, + peer_session_store: Arc, + ) -> Self { + let flags = global_ctx.get_flags(); let tunnel_info = tunnel.info(); let (ctrl_sender, _ctrl_receiver) = broadcast::channel(8); + let secure_mode_cfg = global_ctx.config.get_secure_mode(); + let session_filter = PeerSessionTunnelFilter::new_with_peer( + my_peer_id, + secure_mode_cfg + .as_ref() + .map(|cfg| cfg.enabled) + .unwrap_or(false), + ); + let peer_conn_tunnel_filter = StatsRecorderTunnelFilter::new(); let throughput = peer_conn_tunnel_filter.filter_output(); - let peer_conn_tunnel = TunnelWithFilter::new(tunnel, peer_conn_tunnel_filter); + let filter_chain = TunnelFilterChain::new(session_filter.clone(), peer_conn_tunnel_filter); + let peer_conn_tunnel = TunnelWithFilter::new(tunnel, filter_chain); let mut mpsc_tunnel = MpscTunnel::new(peer_conn_tunnel, Some(Duration::from_secs(7))); let (recv, sink) = (mpsc_tunnel.get_stream(), mpsc_tunnel.get_sink()); let conn_id = PeerConnId::new_v4(); + let my_encrypt_algo = flags.encryption_algorithm; PeerConn { conn_id, my_peer_id, + peer_id_hint, global_ctx, + secure_mode_cfg, + session_filter, + noise_handshake_result: None, + tunnel: Arc::new(Mutex::new(Box::new(defer::Defer::new(move || { mpsc_tunnel.close() })))), @@ -177,9 +379,35 @@ impl PeerConn { loss_rate_stats: Arc::new(AtomicU32::new(0)), counters: ArcSwapOption::new(None), + + peer_session_store, + my_encrypt_algo, } } + fn get_peer_session_store(&self) -> &Arc { + &self.peer_session_store + } + + pub fn is_secure_mode_enabled(&self) -> bool { + self.secure_mode_cfg + .as_ref() + .map(|cfg| cfg.enabled) + .unwrap_or(false) + } + + // pri, pub + fn get_keypair(&self) -> Result<(Vec, Vec), Error> { + let cfg = self + .secure_mode_cfg + .as_ref() + .ok_or_else(|| Error::WaitRespError("secure mode config not set".to_owned()))?; + Ok(( + cfg.private_key()?.as_bytes().to_vec(), + cfg.public_key()?.as_bytes().to_vec(), + )) + } + pub fn get_conn_id(&self) -> PeerConnId { self.conn_id } @@ -232,7 +460,7 @@ impl PeerConn { Error::WaitRespError(format!("decode handshake response error: {:?}", e)) })?; - if rsp.network_secret_digrest.len() != std::mem::size_of::() { + if rsp.network_secret_digest.len() != std::mem::size_of::() { return Err(Error::WaitRespError( "invalid network secret digest".to_owned(), )); @@ -273,11 +501,11 @@ impl PeerConn { // only send network secret digest if the network is the same if send_secret_digest { - req.network_secret_digrest + req.network_secret_digest .extend_from_slice(&network.network_secret_digest.unwrap_or_default()); } else { // fill zero - req.network_secret_digrest + req.network_secret_digest .extend_from_slice(&[0u8; std::mem::size_of::()]); } @@ -300,24 +528,582 @@ impl PeerConn { Ok(()) } + fn decode_handshake_packet(pkt: &ZCPacket) -> Result { + let Some(peer_mgr_hdr) = pkt.peer_manager_header() else { + return Err(Error::WaitRespError( + "unexpected packet: cannot decode peer manager hdr".to_owned(), + )); + }; + + if peer_mgr_hdr.packet_type != PacketType::HandShake as u8 { + return Err(Error::WaitRespError(format!( + "unexpected packet type: {:?}", + peer_mgr_hdr.packet_type + ))); + } + + let rsp = HandshakeRequest::decode(pkt.payload()).map_err(|e| { + Error::WaitRespError(format!("decode handshake response error: {:?}", e)) + })?; + + if rsp.network_secret_digest.len() != std::mem::size_of::() { + return Err(Error::WaitRespError( + "invalid network secret digest".to_owned(), + )); + } + + Ok(rsp) + } + + async fn recv_next_peer_manager_packet( + &self, + expected_pkt_type: Option, + ) -> Result { + let mut locked = self.recv.lock().await; + let recv = locked.as_mut().unwrap(); + + loop { + let Some(ret) = recv.next().await else { + return Err(Error::WaitRespError( + "conn closed during wait handshake response".to_owned(), + )); + }; + let pkt = match ret { + Ok(v) => v, + Err(e) => { + return Err(Error::WaitRespError(format!( + "conn recv error during wait handshake response, err: {:?}", + e + ))) + } + }; + + let Some(peer_mgr_hdr) = pkt.peer_manager_header() else { + continue; + }; + + if expected_pkt_type.is_none() + || peer_mgr_hdr.packet_type == *expected_pkt_type.as_ref().unwrap() as u8 + { + return Ok(pkt); + } + } + } + + fn decode_b64_32(input: &str) -> Result, Error> { + let decoded = BASE64_STANDARD + .decode(input) + .map_err(|e| Error::WaitRespError(format!("base64 decode failed: {e:?}")))?; + if decoded.len() != 32 { + return Err(Error::WaitRespError(format!( + "invalid key length: {}", + decoded.len() + ))); + } + Ok(decoded) + } + + fn get_pinned_remote_static_pubkey_b64(&self) -> Option { + let remote_url_str = self + .tunnel_info + .as_ref() + .and_then(|t| t.remote_addr.as_ref()) + .map(|u| u.url.as_str())?; + let remote_url: url::Url = remote_url_str.parse().ok()?; + + self.global_ctx + .config + .get_peers() + .into_iter() + .find(|p| p.uri == remote_url) + .and_then(|p| p.peer_public_key) + } + + async fn send_noise_msg( + &self, + pb: Msg, + packet_type: PacketType, + remote_peer_id: PeerId, + hs: &mut snow::HandshakeState, + ) -> Result<(), Error> { + tracing::info!( + "send noise msg: {:?}, packet_type: {:?}, from: {:?}, to: {:?}", + pb, + packet_type, + self.my_peer_id, + remote_peer_id + ); + let payload = pb.encode_to_vec(); + let mut msg = vec![0u8; 4096]; + let msg_len = hs + .write_message(&payload, &mut msg) + .map_err(|e| Error::WaitRespError(format!("noise write msg1 failed: {e:?}")))?; + let mut pkt = ZCPacket::new_with_payload(&msg[..msg_len]); + pkt.fill_peer_manager_hdr(self.my_peer_id, remote_peer_id, packet_type as u8); + Ok(self.sink.send(pkt).await?) + } + + async fn do_noise_handshake_as_client(&self) -> Result { + let prologue = b"easytier-peerconn-noise".to_vec(); + + let params: NoiseParams = "Noise_XX_25519_ChaChaPoly_SHA256" + .parse() + .map_err(|e| Error::WaitRespError(format!("parse noise params failed: {e:?}")))?; + + let pinned_remote_pubkey = self + .get_pinned_remote_static_pubkey_b64() + .map(|v| Self::decode_b64_32(&v)) + .transpose()?; + + let builder = snow::Builder::new(params); + let (local_private_key, local_static_pubkey) = self.get_keypair()?; + + let network = self.global_ctx.get_network_identity(); + let a_session_generation = self + .peer_id_hint + .and_then(|peer_id| { + self.get_peer_session_store() + .get(&SessionKey::new(network.network_name.clone(), peer_id)) + }) + .map(|s| s.session_generation()); + + let a_conn_id = uuid::Uuid::new_v4(); + let msg1_pb = PeerConnNoiseMsg1Pb { + version: VERSION, + a_network_name: network.network_name.clone(), + a_session_generation, + a_conn_id: Some(a_conn_id.into()), + client_encryption_algorithm: self.my_encrypt_algo.clone(), + }; + + let mut hs = builder + .prologue(&prologue)? + .local_private_key(&local_private_key)? + .build_initiator()?; + + let mut secure_auth_level = SecureAuthLevel::EncryptedUnauthenticated; + + self.send_noise_msg( + msg1_pb, + PacketType::NoiseHandshakeMsg1, + PeerId::default(), + &mut hs, + ) + .await?; + + let server_handshake_hash = hs.get_handshake_hash().to_vec(); + + let msg2 = timeout( + Duration::from_secs(5), + self.recv_next_peer_manager_packet(Some(PacketType::NoiseHandshakeMsg2)), + ) + .await??; + let remote_peer_id = msg2.get_src_peer_id().expect("missing src peer id"); + if let Some(hint) = self.peer_id_hint { + if hint != remote_peer_id { + return Err(Error::WaitRespError("peer_id mismatch".to_owned())); + } + } + let msg2_pb = Self::decode_handshake_message::( + PacketType::NoiseHandshakeMsg2, + Some(&mut hs), + msg2, + )?; + if msg2_pb.a_conn_id_echo != Some(a_conn_id.into()) { + return Err(Error::WaitRespError( + "noise msg2 conn_id_echo mismatch".to_owned(), + )); + } + let action = PeerConnSessionActionPb::try_from(msg2_pb.action) + .map_err(|_| Error::WaitRespError("invalid session action".to_owned()))?; + let remote_network_name = msg2_pb.b_network_name.clone(); + + if remote_network_name == network.network_name { + if msg2_pb.role_hint != 1 { + return Err(Error::WaitRespError( + "role_hint must be 1 when network_name is same".to_owned(), + )); + } + let Some(secret_proof_32) = msg2_pb.secret_proof_32 else { + return Err(Error::WaitRespError( + "secret_proof_32 must be present when role_hint is 1".to_owned(), + )); + }; + let verify_result = self + .global_ctx + .get_secret_proof(&server_handshake_hash) + .map(|mac| mac.verify_slice(&secret_proof_32).is_ok()); + if verify_result != Some(true) { + return Err(Error::WaitRespError(format!( + "secret_proof_32 verify failed: {verify_result:?}" + ))); + } + + secure_auth_level = secure_auth_level.max(SecureAuthLevel::NetworkSecretConfirmed); + } + + let handshake_hash_for_proof = hs.get_handshake_hash().to_vec(); + let secret_proof_32 = self + .global_ctx + .get_secret_proof(&handshake_hash_for_proof) + .map(|mac| mac.finalize().into_bytes().to_vec()); + + let secret_digest = if use_global_var!(HMAC_SECRET_DIGEST) { + self.global_ctx + .get_secret_proof("digest".as_bytes()) + .map(|mac| mac.finalize().into_bytes().to_vec()) + .unwrap_or_default() + } else { + network.network_secret_digest.unwrap_or_default().to_vec() + }; + + let msg3_pb = PeerConnNoiseMsg3Pb { + a_conn_id_echo: Some(a_conn_id.into()), + b_conn_id_echo: msg2_pb.b_conn_id, + secret_proof_32, + secret_digest: secret_digest.clone(), + }; + self.send_noise_msg( + msg3_pb, + PacketType::NoiseHandshakeMsg3, + remote_peer_id, + &mut hs, + ) + .await?; + + let remote_static = hs + .get_remote_static() + .map(|x: &[u8]| x.to_vec()) + .unwrap_or_default(); + + if let Some(pinned) = pinned_remote_pubkey.as_ref() { + if pinned.as_slice() == remote_static.as_slice() { + secure_auth_level = + secure_auth_level.max(SecureAuthLevel::SharedNodePubkeyVerified); + } else { + return Err(Error::WaitRespError( + "pinned remote static pubkey mismatch".to_owned(), + )); + } + } + + let handshake_hash = hs.get_handshake_hash().to_vec(); + + let algo = self.global_ctx.get_flags().encryption_algorithm.clone(); + let root_key = msg2_pb + .root_key_32 + .as_deref() + .filter(|v| v.len() == 32) + .map(|v| { + let mut key = [0u8; 32]; + key.copy_from_slice(v); + key + }); + let session_action = match action { + PeerConnSessionActionPb::Join => PeerSessionAction::Join, + PeerConnSessionActionPb::Sync => PeerSessionAction::Sync, + PeerConnSessionActionPb::Create => PeerSessionAction::Create, + }; + let session = self.get_peer_session_store().apply_initiator_action( + &SessionKey::new(network.network_name.clone(), remote_peer_id), + session_action, + msg2_pb.b_session_generation, + root_key, + msg2_pb.initial_epoch, + algo, + msg2_pb.server_encryption_algorithm.clone(), + )?; + + Ok(NoiseHandshakeResult { + peer_id: remote_peer_id, + session, + local_static_pubkey: local_static_pubkey.to_vec(), + remote_static_pubkey: remote_static, + handshake_hash, + secure_auth_level, + remote_network_name, + // we have authorized the peer with noise handshake, so just set secret digest same as us even remote is a shared node. + secret_digest, + client_secret_proof: None, + + my_encrypt_algo: self.my_encrypt_algo.clone(), + remote_encrypt_algo: msg2_pb.server_encryption_algorithm.clone(), + }) + } + + fn decode_handshake_message( + expected_pkt_type: PacketType, + hs: Option<&mut HandshakeState>, + pkt: ZCPacket, + ) -> Result + where + MsgT: prost::Message + Default, + { + tracing::info!( + "decode_handshake_message: {:?}, expected_pkt_type: {:?}", + pkt, + expected_pkt_type + ); + let Some(hdr) = pkt.peer_manager_header() else { + return Err(Error::WaitRespError( + "packet without peer manager header".to_owned(), + )); + }; + + if hdr.packet_type != expected_pkt_type as u8 { + return Err(Error::WaitRespError(format!( + "packet type not {:?}", + expected_pkt_type + ))); + } + + let msg = match hs { + Some(hs) => { + let mut out = vec![0u8; 4096]; + let out_len = hs + .read_message(pkt.payload(), &mut out) + .map_err(|e| Error::WaitRespError(format!("noise read msg failed: {e:?}")))?; + MsgT::decode(&out[..out_len]) + .map_err(|e| Error::WaitRespError(format!("decode message failed: {e:?}")))? + } + None => MsgT::decode(pkt.payload()) + .map_err(|e| Error::WaitRespError(format!("decode message failed: {e:?}")))?, + }; + + Ok(msg) + } + + async fn read_next_message_with_timeout( + &mut self, + read_timeout: Duration, + ) -> Result { + timeout(read_timeout, async { + let mut locked = self.recv.lock().await; + let recv = locked.as_mut().unwrap(); + Ok(recv + .next() + .await + .ok_or(Error::WaitRespError("read next message failed".to_owned()))??) + }) + .await + .map_err(|e| Error::WaitRespError(format!("read next message timeout: {e:?}")))? + } + + async fn do_noise_handshake_as_server( + &mut self, + first_msg1: ZCPacket, + mut handshake_recved: Fn, + ) -> Result + where + Fn: FnMut(&mut PeerConn, &str) -> Result<(), Error> + Send, + { + let prologue = b"easytier-peerconn-noise".to_vec(); + + let params: NoiseParams = "Noise_XX_25519_ChaChaPoly_SHA256" + .parse() + .map_err(|e| Error::WaitRespError(format!("parse noise params failed: {e:?}")))?; + let builder = snow::Builder::new(params); + + let (local_static_private_key, local_static_pubkey) = self.get_keypair()?; + + let mut hs = builder + .prologue(&prologue)? + .local_private_key(&local_static_private_key)? + .build_responder()?; + + let remote_peer_id = first_msg1 + .get_src_peer_id() + .expect("msg1 must have src peer id"); + + let msg1_pb = Self::decode_handshake_message::( + PacketType::NoiseHandshakeMsg1, + Some(&mut hs), + first_msg1, + )?; + let remote_network_name = msg1_pb.a_network_name.clone(); + + // this may update my peer id + handshake_recved(self, &remote_network_name)?; + + let server_network_name = self.global_ctx.get_network_name(); + let (role_hint, secret_proof_32) = if msg1_pb.a_network_name == server_network_name { + ( + 1, + self.global_ctx + .get_secret_proof(hs.get_handshake_hash()) + .map(|m| m.finalize().into_bytes().to_vec()), + ) + } else { + (2, None) + }; + + let algo = self.global_ctx.get_flags().encryption_algorithm.clone(); + let UpsertResponderSessionReturn { + session, + action, + session_generation: b_session_generation, + root_key: root_key_32, + initial_epoch, + } = self.get_peer_session_store().upsert_responder_session( + &SessionKey::new(remote_network_name.clone(), remote_peer_id), + msg1_pb.a_session_generation, + algo.clone(), + msg1_pb.client_encryption_algorithm.clone(), + )?; + + let b_conn_id = uuid::Uuid::new_v4(); + let msg2_pb = PeerConnNoiseMsg2Pb { + b_network_name: server_network_name, + role_hint, + action: match action { + PeerSessionAction::Join => PeerConnSessionActionPb::Join as i32, + PeerSessionAction::Sync => PeerConnSessionActionPb::Sync as i32, + PeerSessionAction::Create => PeerConnSessionActionPb::Create as i32, + }, + b_session_generation, + root_key_32: root_key_32.map(|k| k.to_vec()), + initial_epoch, + b_conn_id: Some(b_conn_id.into()), + a_conn_id_echo: msg1_pb.a_conn_id, + secret_proof_32, + server_encryption_algorithm: algo, + }; + self.send_noise_msg( + msg2_pb, + PacketType::NoiseHandshakeMsg2, + remote_peer_id, + &mut hs, + ) + .await?; + + let handshake_hash_for_proof = hs.get_handshake_hash().to_vec(); + + let msg3_pkt = timeout( + Duration::from_secs(5), + self.recv_next_peer_manager_packet(Some(PacketType::NoiseHandshakeMsg3)), + ) + .await??; + let msg3_pb = Self::decode_handshake_message::( + PacketType::NoiseHandshakeMsg3, + Some(&mut hs), + msg3_pkt, + )?; + + if msg3_pb.a_conn_id_echo != msg1_pb.a_conn_id { + return Err(Error::WaitRespError( + "noise msg3 a_conn_id mismatch".to_owned(), + )); + } + if msg3_pb.b_conn_id_echo != Some(b_conn_id.into()) { + return Err(Error::WaitRespError( + "noise msg3 b_conn_id mismatch".to_owned(), + )); + } + + let mut secure_auth_level = SecureAuthLevel::EncryptedUnauthenticated; + let Some(proof) = msg3_pb.secret_proof_32.as_ref() else { + return Err(Error::WaitRespError( + "noise msg3 secret_proof_32 is required".to_owned(), + )); + }; + + if role_hint == 1 { + if let Some(mac) = self.global_ctx.get_secret_proof(&handshake_hash_for_proof) { + if mac.verify_slice(proof).is_ok() { + secure_auth_level = + secure_auth_level.max(SecureAuthLevel::NetworkSecretConfirmed); + } else { + return Err(Error::WaitRespError("invalid secret_proof".to_owned())); + } + } + } + + let remote_static = hs + .get_remote_static() + .map(|x: &[u8]| x.to_vec()) + .unwrap_or_default(); + + let handshake_hash = hs.get_handshake_hash().to_vec(); + + Ok(NoiseHandshakeResult { + peer_id: remote_peer_id, + session, + local_static_pubkey: local_static_pubkey.to_vec(), + remote_static_pubkey: remote_static, + handshake_hash, + secure_auth_level, + remote_network_name, + secret_digest: msg3_pb.secret_digest, + client_secret_proof: Some(SecretProof { + challenge: handshake_hash_for_proof, + proof: proof.clone(), + }), + + my_encrypt_algo: self.my_encrypt_algo.clone(), + remote_encrypt_algo: msg1_pb.client_encryption_algorithm.clone(), + }) + } + + fn build_handshake_rsp(&self, noise: &NoiseHandshakeResult) -> HandshakeRequest { + tracing::info!("build_handshake_rsp: {:?}", noise); + HandshakeRequest { + magic: MAGIC, + my_peer_id: noise.peer_id, + version: VERSION, + network_name: noise.remote_network_name.clone(), + + features: Vec::new(), + network_secret_digest: noise.secret_digest.clone(), + } + } + #[tracing::instrument(skip(handshake_recved))] pub async fn do_handshake_as_server_ext( &mut self, mut handshake_recved: Fn, ) -> Result<(), Error> where - Fn: FnMut(&mut Self, &HandshakeRequest) -> Result<(), Error> + Send, + Fn: FnMut(&mut PeerConn, &str) -> Result<(), Error> + Send, { - let rsp = self.wait_handshake_loop().await?; + let first_pkt = timeout( + Duration::from_secs(5), + self.recv_next_peer_manager_packet(None), + ) + .await??; + let Some(hdr) = first_pkt.peer_manager_header() else { + return Err(Error::WaitRespError( + "first packet must have peer manager header".to_owned(), + )); + }; - handshake_recved(self, &rsp)?; + if self.is_secure_mode_enabled() && hdr.packet_type == PacketType::NoiseHandshakeMsg1 as u8 + { + let noise = self + .do_noise_handshake_as_server(first_pkt, handshake_recved) + .await?; + // construct handshake rsp from noise result for compat. + let handshake_rsp = self.build_handshake_rsp(&noise); + self.session_filter.set_session(noise.session.clone()); + self.session_filter.set_peer_id(noise.peer_id); + self.noise_handshake_result = Some(noise); - tracing::info!("handshake request: {:?}", rsp); - self.info = Some(rsp); - self.is_client = Some(false); + self.info = Some(handshake_rsp); + self.is_client = Some(false); + } else if hdr.packet_type == PacketType::HandShake as u8 { + let rsp = Self::decode_handshake_packet(&first_pkt)?; + handshake_recved(self, &rsp.network_name)?; + tracing::info!("handshake request: {:?}", rsp); + self.info = Some(rsp); + self.is_client = Some(false); - let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity(); - self.send_handshake(send_digest).await?; + let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity(); + self.send_handshake(send_digest).await?; + } else { + return Err(Error::WaitRespError(format!( + "unexpected packet type during handshake: {}", + hdr.packet_type + ))); + } if self.get_peer_id() == self.my_peer_id { Err(Error::WaitRespError("peer id conflict".to_owned())) @@ -328,31 +1114,28 @@ impl PeerConn { #[tracing::instrument] pub async fn do_handshake_as_server(&mut self) -> Result<(), Error> { - let rsp = self.wait_handshake_loop().await?; - tracing::info!("handshake request: {:?}", rsp); - self.info = Some(rsp); - self.is_client = Some(false); - - let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity(); - self.send_handshake(send_digest).await?; - - if self.get_peer_id() == self.my_peer_id { - Err(Error::WaitRespError( - "peer id conflict, are you connecting to yourself?".to_owned(), - )) - } else { - Ok(()) - } + self.do_handshake_as_server_ext(|_, _| Ok(())).await } #[tracing::instrument] pub async fn do_handshake_as_client(&mut self) -> Result<(), Error> { - self.send_handshake(true).await?; - tracing::info!("waiting for handshake request from server"); - let rsp = self.wait_handshake_loop().await?; - tracing::info!("handshake response: {:?}", rsp); - self.info = Some(rsp); - self.is_client = Some(true); + if self.is_secure_mode_enabled() { + let noise = self.do_noise_handshake_as_client().await?; + self.session_filter.set_session(noise.session.clone()); + self.session_filter.set_peer_id(noise.peer_id); + + let handshake_rsp = self.build_handshake_rsp(&noise); + self.noise_handshake_result = Some(noise); + self.info = Some(handshake_rsp); + self.is_client = Some(true); + } else { + self.send_handshake(true).await?; + tracing::info!("waiting for handshake request from server"); + let rsp = self.wait_handshake_loop().await?; + tracing::info!("handshake response: {:?}", rsp); + self.info = Some(rsp); + self.is_client = Some(true); + } if self.get_peer_id() == self.my_peer_id { Err(Error::WaitRespError( @@ -488,7 +1271,7 @@ impl PeerConn { ret.network_secret_digest .as_mut() .unwrap() - .copy_from_slice(&info.network_secret_digrest); + .copy_from_slice(&info.network_secret_digest); ret } @@ -521,6 +1304,21 @@ impl PeerConn { is_client: self.is_client.unwrap_or_default(), network_name: info.network_name.clone(), is_closed: self.close_event_notifier.is_closed(), + noise_local_static_pubkey: self + .noise_handshake_result + .as_ref() + .map(|x| x.local_static_pubkey.clone()) + .unwrap_or_default(), + noise_remote_static_pubkey: self + .noise_handshake_result + .as_ref() + .map(|x| x.remote_static_pubkey.clone()) + .unwrap_or_default(), + secure_auth_level: self + .noise_handshake_result + .as_ref() + .map(|x| x.secure_auth_level as i32) + .unwrap_or_default(), } } @@ -529,6 +1327,7 @@ impl PeerConn { panic!("set_peer_id should only be called before handshake"); } self.my_peer_id = peer_id; + self.session_filter.set_my_peer_id(peer_id); } pub fn get_my_peer_id(&self) -> PeerId { @@ -544,26 +1343,48 @@ impl Drop for PeerConn { } #[cfg(test)] -mod tests { +pub mod tests { use std::sync::Arc; + use rand::rngs::OsRng; + use super::*; + use crate::common::config::PeerConfig; use crate::common::global_ctx::tests::get_mock_global_ctx; + use crate::common::global_ctx::GlobalCtx; use crate::common::new_peer_id; use crate::common::scoped_task::ScopedTask; use crate::peers::create_packet_recv_chan; + use crate::peers::recv_packet_from_chan; use crate::tunnel::filter::tests::DropSendTunnelFilter; use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::ring::create_ring_tunnel_pair; + pub fn set_secure_mode_cfg(global_ctx: &GlobalCtx, enabled: bool) { + if !enabled { + global_ctx.config.set_secure_mode(None); + } else { + // generate x25519 key pair + let private = x25519_dalek::StaticSecret::random_from_rng(OsRng); + let public = x25519_dalek::PublicKey::from(&private); + + global_ctx.config.set_secure_mode(Some(SecureModeConfig { + enabled: true, + local_private_key: Some(BASE64_STANDARD.encode(private.as_bytes())), + local_public_key: Some(BASE64_STANDARD.encode(public.as_bytes())), + })); + } + } + #[tokio::test] async fn peer_conn_handshake_same_id() { + let ps = Arc::new(PeerSessionStore::new()); let (c, s) = create_ring_tunnel_pair(); let c_peer_id = new_peer_id(); let s_peer_id = c_peer_id; - let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); - let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s), ps.clone()); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), @@ -587,9 +1408,11 @@ mod tests { let c_peer_id = new_peer_id(); let s_peer_id = new_peer_id(); - let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); + let ps = Arc::new(PeerSessionStore::new()); - let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c), ps.clone()); + + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s), ps.clone()); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), @@ -611,6 +1434,312 @@ mod tests { assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default()); } + #[tokio::test] + async fn peer_conn_secure_mode_pubkey_and_encryption() { + let (c, s) = create_ring_tunnel_pair(); + + let c_recorder = Arc::new(PacketRecorderTunnelFilter::new()); + let s_recorder = Arc::new(PacketRecorderTunnelFilter::new()); + + let c = TunnelWithFilter::new(c, c_recorder.clone()); + let s = TunnelWithFilter::new(s, s_recorder.clone()); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + set_secure_mode_cfg(&c_ctx, true); + set_secure_mode_cfg(&s_ctx, true); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx.clone(), Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx.clone(), Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + + c_ret.unwrap(); + s_ret.unwrap(); + + let c_info = c_peer.get_conn_info(); + let s_info = s_peer.get_conn_info(); + + assert_eq!(c_info.noise_local_static_pubkey.len(), 32); + assert_eq!(c_info.noise_remote_static_pubkey.len(), 32); + assert_eq!(s_info.noise_local_static_pubkey.len(), 32); + assert_eq!(s_info.noise_remote_static_pubkey.len(), 32); + + assert_eq!( + c_info.noise_remote_static_pubkey, + s_info.noise_local_static_pubkey + ); + assert_eq!( + s_info.noise_remote_static_pubkey, + c_info.noise_local_static_pubkey + ); + + let network = s_ctx.get_network_identity(); + let mut expected = HandshakeRequest { + magic: MAGIC, + my_peer_id: s_peer_id, + version: VERSION, + features: Vec::new(), + network_name: network.network_name.clone(), + ..Default::default() + }; + expected + .network_secret_digest + .extend_from_slice(&network.network_secret_digest.unwrap_or_default()); + let expected_payload = expected.encode_to_vec(); + + println!("sent: {:?}", c_recorder.sent.lock().unwrap()); + + let wire_hs = c_recorder + .sent + .lock() + .unwrap() + .iter() + .find(|p| { + p.peer_manager_header() + .is_some_and(|h| h.packet_type == PacketType::NoiseHandshakeMsg3 as u8) + }) + .unwrap() + .clone(); + assert_ne!(wire_hs.payload(), expected_payload.as_slice()); + } + + #[tokio::test] + async fn peer_conn_secure_mode_server_accept_legacy_client() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + + c_ctx + .config + .set_network_identity(NetworkIdentity::new("user".to_string(), "sec1".to_string())); + s_ctx.config.set_network_identity(NetworkIdentity { + network_name: "shared".to_string(), + network_secret: None, + network_secret_digest: None, + }); + set_secure_mode_cfg(&s_ctx, true); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx, Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx, Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + + c_ret.unwrap(); + s_ret.unwrap(); + + assert_eq!( + c_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::None as i32, + ); + assert_eq!( + s_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::None as i32, + ); + + assert_eq!(c_peer.get_conn_info().network_name, "shared".to_string()); + assert_eq!(s_peer.get_conn_info().network_name, "user".to_string()); + } + + #[tokio::test] + async fn peer_conn_secure_mode_different_network_name_ok() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + + c_ctx + .config + .set_network_identity(NetworkIdentity::new("user".to_string(), "sec1".to_string())); + s_ctx.config.set_network_identity(NetworkIdentity::new( + "shared".to_string(), + "sec2".to_string(), + )); + + set_secure_mode_cfg(&c_ctx, true); + set_secure_mode_cfg(&s_ctx, true); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx, Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx, Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + c_ret.unwrap(); + s_ret.unwrap(); + + assert_eq!( + c_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::EncryptedUnauthenticated as i32, + ); + assert_eq!( + s_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::EncryptedUnauthenticated as i32, + ); + + assert_eq!(c_peer.get_conn_info().network_name, "shared".to_string()); + assert_eq!(s_peer.get_conn_info().network_name, "user".to_string()); + } + + #[tokio::test] + async fn peer_conn_secure_mode_data_roundtrip() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + set_secure_mode_cfg(&c_ctx, true); + set_secure_mode_cfg(&s_ctx, true); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx, Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx, Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + c_ret.unwrap(); + s_ret.unwrap(); + + let (packet_send, mut packet_recv) = create_packet_recv_chan(); + s_peer.start_recv_loop(packet_send).await; + + let payload = b"secure-data-123"; + let mut pkt = ZCPacket::new_with_payload(payload); + pkt.fill_peer_manager_hdr(c_peer_id, s_peer_id, PacketType::Data as u8); + c_peer.send_msg(pkt).await.unwrap(); + + let got = timeout(Duration::from_secs(2), async move { + recv_packet_from_chan(&mut packet_recv).await + }) + .await + .unwrap() + .unwrap(); + + assert_eq!(got.payload(), payload); + assert_eq!( + got.peer_manager_header().unwrap().packet_type, + PacketType::Data as u8 + ); + } + + #[tokio::test] + async fn peer_conn_secure_mode_network_secret_confirmed() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + + c_ctx + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + s_ctx + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + + set_secure_mode_cfg(&c_ctx, true); + set_secure_mode_cfg(&s_ctx, true); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx, Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx, Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + c_ret.unwrap(); + s_ret.unwrap(); + + assert_eq!( + c_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::NetworkSecretConfirmed as i32, + ); + assert_eq!( + s_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::NetworkSecretConfirmed as i32, + ); + } + + #[tokio::test] + async fn peer_conn_secure_mode_shared_node_pubkey_verified() { + let (c, s) = create_ring_tunnel_pair(); + + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); + + let c_ctx = get_mock_global_ctx(); + let s_ctx = get_mock_global_ctx(); + + c_ctx + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec2".to_string())); + s_ctx.config.set_network_identity(NetworkIdentity { + network_name: "net2".to_string(), + network_secret: None, + network_secret_digest: None, + }); + + let remote_url: url::Url = c.info().unwrap().remote_addr.unwrap().url.parse().unwrap(); + + set_secure_mode_cfg(&c_ctx, true); + set_secure_mode_cfg(&s_ctx, true); + + c_ctx.config.set_peers(vec![PeerConfig { + uri: remote_url, + peer_public_key: Some( + s_ctx + .config + .get_secure_mode() + .unwrap() + .local_public_key + .unwrap(), + ), + }]); + + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, c_ctx, Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, s_ctx, Box::new(s), ps.clone()); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + c_ret.unwrap(); + s_ret.unwrap(); + + assert_eq!( + c_peer.get_conn_info().secure_auth_level, + SecureAuthLevel::SharedNodePubkeyVerified as i32, + ); + } + async fn peer_conn_pingpong_test_common( drop_start: u32, drop_end: u32, @@ -626,8 +1755,9 @@ mod tests { let c_peer_id = new_peer_id(); let s_peer_id = new_peer_id(); - let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); - let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); + let ps = Arc::new(PeerSessionStore::new()); + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c), ps.clone()); + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s), ps.clone()); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), @@ -682,8 +1812,14 @@ mod tests { #[tokio::test] async fn close_tunnel_during_handshake() { + let ps = Arc::new(PeerSessionStore::new()); let (c, s) = create_ring_tunnel_pair(); - let mut c_peer = PeerConn::new(new_peer_id(), get_mock_global_ctx(), Box::new(c)); + let mut c_peer = PeerConn::new( + new_peer_id(), + get_mock_global_ctx(), + Box::new(c), + ps.clone(), + ); let j = tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; drop(s); diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 97771a73..2717eccb 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -32,6 +32,7 @@ use crate::{ peers::{ peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, + peer_session::PeerSessionStore, recv_packet_from_chan, route_trait::{ForeignNetworkRouteInfoMap, MockRoute, NextHopPolicy, RouteInterface}, PeerPacketFilter, @@ -160,6 +161,8 @@ pub struct PeerManager { allow_loopback_tunnel: AtomicBool, self_tx_counters: SelfTxCounters, + + peer_session_store: Arc, } impl Debug for PeerManager { @@ -312,6 +315,8 @@ impl PeerManager { allow_loopback_tunnel: AtomicBool::new(true), self_tx_counters, + + peer_session_store: Arc::new(PeerSessionStore::new()), } } @@ -363,7 +368,23 @@ impl PeerManager { tunnel: Box, is_directly_connected: bool, ) -> Result<(PeerId, PeerConnId), Error> { - let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); + self.add_client_tunnel_with_peer_id_hint(tunnel, is_directly_connected, None) + .await + } + + pub async fn add_client_tunnel_with_peer_id_hint( + &self, + tunnel: Box, + is_directly_connected: bool, + peer_id_hint: Option, + ) -> Result<(PeerId, PeerConnId), Error> { + let mut peer = PeerConn::new_with_peer_id_hint( + self.my_peer_id, + self.global_ctx.clone(), + tunnel, + peer_id_hint, + self.peer_session_store.clone(), + ); peer.set_is_hole_punched(!is_directly_connected); peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); @@ -387,9 +408,19 @@ impl PeerManager { } #[tracing::instrument] - pub async fn try_direct_connect( + pub async fn try_direct_connect(&self, connector: C) -> Result<(PeerId, PeerConnId), Error> + where + C: TunnelConnector + Debug, + { + self.try_direct_connect_with_peer_id_hint(connector, None) + .await + } + + #[tracing::instrument] + pub async fn try_direct_connect_with_peer_id_hint( &self, mut connector: C, + peer_id_hint: Option, ) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, @@ -398,7 +429,8 @@ impl PeerManager { let t = ns .run_async(|| async move { connector.connect().await }) .await?; - self.add_client_tunnel(t, true).await + self.add_client_tunnel_with_peer_id_hint(t, true, peer_id_hint) + .await } // avoid loop back to virtual network @@ -447,9 +479,14 @@ impl PeerManager { tracing::info!("add tunnel as server start"); self.check_remote_addr_not_from_virtual_network(&tunnel)?; - let mut conn = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); - conn.do_handshake_as_server_ext(|peer, msg| { - if msg.network_name + let mut conn = PeerConn::new( + self.my_peer_id, + self.global_ctx.clone(), + tunnel, + self.peer_session_store.clone(), + ); + conn.do_handshake_as_server_ext(|peer, network_name:&str| { + if network_name == self.global_ctx.get_network_identity().network_name { return Ok(()); @@ -463,9 +500,9 @@ impl PeerManager { let mut peer_id = self .foreign_network_manager - .get_network_peer_id(&msg.network_name); + .get_network_peer_id(network_name); if peer_id.is_none() { - peer_id = Some(*self.reserved_my_peer_id_map.entry(msg.network_name.clone()).or_insert_with(|| { + peer_id = Some(*self.reserved_my_peer_id_map.entry(network_name.to_string()).or_insert_with(|| { rand::random::() }).value()); } @@ -473,7 +510,7 @@ impl PeerManager { tracing::info!( ?peer_id, - ?msg.network_name, + ?network_name, "handshake as server with foreign network, new peer id: {}, peer id in foreign manager: {:?}", peer.get_my_peer_id(), peer_id ); @@ -1464,7 +1501,10 @@ mod tests { use std::{fmt::Debug, sync::Arc, time::Duration}; use crate::{ - common::{config::Flags, global_ctx::tests::get_mock_global_ctx}, + common::{ + config::Flags, + global_ctx::{tests::get_mock_global_ctx, NetworkIdentity}, + }, connector::{ create_connector_by_url, direct::PeerManagerForDirectConnector, udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun, @@ -1472,6 +1512,7 @@ mod tests { instance::listeners::get_listener_by_url, peers::{ create_packet_recv_chan, + peer_conn::tests::set_secure_mode_cfg, peer_manager::RouteAlgoType, peer_rpc::tests::register_service, route_trait::NextHopPolicy, @@ -1480,7 +1521,10 @@ mod tests { wait_route_appear_with_cost, }, }, - proto::common::{CompressionAlgoPb, NatType, PeerFeatureFlag}, + proto::{ + common::{CompressionAlgoPb, NatType, PeerFeatureFlag}, + peer_rpc::SecureAuthLevel, + }, tunnel::{ common::tests::wait_for_condition, filter::{tests::DropSendTunnelFilter, TunnelWithFilter}, @@ -1523,6 +1567,279 @@ mod tests { .await; } + #[tokio::test] + async fn peer_manager_safe_mode_connect_between_peers() { + let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + peer_mgr_a + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + peer_mgr_b + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + + set_secure_mode_cfg(&peer_mgr_a.get_global_ctx(), true); + set_secure_mode_cfg(&peer_mgr_b.get_global_ctx(), true); + + let (a_ring, b_ring) = create_ring_tunnel_pair(); + let (a_ret, b_ret) = tokio::join!( + peer_mgr_a.add_client_tunnel(a_ring, false), + peer_mgr_b.add_tunnel_as_server(b_ring, true) + ); + let (peer_b_id, _) = a_ret.unwrap(); + b_ret.unwrap(); + + wait_for_condition( + || { + let peer_mgr_a = peer_mgr_a.clone(); + async move { + if !peer_mgr_a + .get_peer_map() + .list_peers_with_conn() + .await + .contains(&peer_b_id) + { + return false; + } + let Some(conns) = peer_mgr_a.get_peer_map().list_peer_conns(peer_b_id).await + else { + return false; + }; + conns.iter().any(|c| { + c.noise_local_static_pubkey.len() == 32 + && c.noise_remote_static_pubkey.len() == 32 + && c.secure_auth_level == SecureAuthLevel::NetworkSecretConfirmed as i32 + }) + } + }, + Duration::from_secs(10), + ) + .await; + + let peer_a_id = peer_mgr_a.my_peer_id(); + wait_for_condition( + || { + let peer_mgr_b = peer_mgr_b.clone(); + async move { + if !peer_mgr_b + .get_peer_map() + .list_peers_with_conn() + .await + .contains(&peer_a_id) + { + return false; + } + let Some(conns) = peer_mgr_b.get_peer_map().list_peer_conns(peer_a_id).await + else { + return false; + }; + conns.iter().any(|c| { + c.noise_local_static_pubkey.len() == 32 + && c.noise_remote_static_pubkey.len() == 32 + && c.secure_auth_level == SecureAuthLevel::NetworkSecretConfirmed as i32 + }) + } + }, + Duration::from_secs(10), + ) + .await; + } + + #[tokio::test] + async fn peer_manager_safe_server_accept_legacy_client() { + let peer_mgr_client = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_server = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + peer_mgr_client + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + peer_mgr_server + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity::new("net1".to_string(), "sec1".to_string())); + + set_secure_mode_cfg(&peer_mgr_server.get_global_ctx(), true); + + let (c_ring, s_ring) = create_ring_tunnel_pair(); + let (c_ret, s_ret) = tokio::join!( + peer_mgr_client.add_client_tunnel(c_ring, false), + peer_mgr_server.add_tunnel_as_server(s_ring, true) + ); + let (server_id, _) = c_ret.unwrap(); + s_ret.unwrap(); + + wait_for_condition( + || { + let peer_mgr_client = peer_mgr_client.clone(); + async move { + if !peer_mgr_client + .get_peer_map() + .list_peers_with_conn() + .await + .contains(&server_id) + { + return false; + } + let Some(conns) = peer_mgr_client + .get_peer_map() + .list_peer_conns(server_id) + .await + else { + return false; + }; + conns.iter().any(|c| { + c.noise_local_static_pubkey.is_empty() + && c.noise_remote_static_pubkey.is_empty() + && c.secure_auth_level == SecureAuthLevel::None as i32 + }) + } + }, + Duration::from_secs(10), + ) + .await; + + let client_id = peer_mgr_client.my_peer_id(); + wait_for_condition( + || { + let peer_mgr_server = peer_mgr_server.clone(); + async move { + if !peer_mgr_server + .get_peer_map() + .list_peers_with_conn() + .await + .contains(&client_id) + { + return false; + } + let Some(conns) = peer_mgr_server + .get_peer_map() + .list_peer_conns(client_id) + .await + else { + return false; + }; + conns.iter().any(|c| { + c.noise_local_static_pubkey.is_empty() + && c.noise_remote_static_pubkey.is_empty() + && c.secure_auth_level == SecureAuthLevel::None as i32 + }) + } + }, + Duration::from_secs(5), + ) + .await; + } + + #[tokio::test] + async fn peer_manager_safe_mode_shared_node_pinning_connect() { + let peer_mgr_client = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_server = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + peer_mgr_client + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity::new("user".to_string(), "sec1".to_string())); + peer_mgr_server + .get_global_ctx() + .config + .set_network_identity(NetworkIdentity { + network_name: "shared".to_string(), + network_secret: None, + network_secret_digest: None, + }); + + set_secure_mode_cfg(&peer_mgr_client.get_global_ctx(), true); + set_secure_mode_cfg(&peer_mgr_server.get_global_ctx(), true); + + let server_pub_b64 = peer_mgr_server + .get_global_ctx() + .config + .get_secure_mode() + .unwrap() + .local_public_key + .unwrap(); + + let (a_ring, b_ring) = create_ring_tunnel_pair(); + let server_remote_url: url::Url = a_ring + .info() + .unwrap() + .remote_addr + .unwrap() + .url + .parse() + .unwrap(); + peer_mgr_client.get_global_ctx().config.set_peers(vec![ + crate::common::config::PeerConfig { + uri: server_remote_url, + peer_public_key: Some(server_pub_b64.clone()), + }, + ]); + + let (c_ret, s_ret) = tokio::join!( + peer_mgr_client.add_client_tunnel(a_ring, false), + peer_mgr_server.add_tunnel_as_server(b_ring, true) + ); + c_ret.unwrap(); + s_ret.unwrap(); + + wait_for_condition( + || { + let peer_mgr_client = peer_mgr_client.clone(); + async move { + let foreign_peer_map = + peer_mgr_client.get_foreign_network_client().get_peer_map(); + if foreign_peer_map.list_peers_with_conn().await.len() != 1 { + return false; + } + let Some(peer_id) = foreign_peer_map + .list_peers_with_conn() + .await + .into_iter() + .next() + else { + return false; + }; + let Some(conns) = foreign_peer_map.list_peer_conns(peer_id).await else { + return false; + }; + conns.iter().any(|c| { + c.secure_auth_level == SecureAuthLevel::SharedNodePubkeyVerified as i32 + && c.noise_local_static_pubkey.len() == 32 + && c.noise_remote_static_pubkey.len() == 32 + }) + } + }, + Duration::from_secs(10), + ) + .await; + + wait_for_condition( + || { + let peer_mgr_server = peer_mgr_server.clone(); + async move { + let foreigns = peer_mgr_server + .get_foreign_network_manager() + .list_foreign_networks() + .await; + let Some(entry) = foreigns.foreign_networks.get("user") else { + return false; + }; + entry.peers.iter().any(|p| { + p.conns + .iter() + .any(|c| c.noise_local_static_pubkey.len() == 32) + }) + } + }, + Duration::from_secs(10), + ) + .await; + } + async fn connect_peer_manager_with( client_mgr: Arc, server_mgr: &Arc, diff --git a/easytier/src/peers/peer_session.rs b/easytier/src/peers/peer_session.rs new file mode 100644 index 00000000..fc766cad --- /dev/null +++ b/easytier/src/peers/peer_session.rs @@ -0,0 +1,817 @@ +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, RwLock, + }, + time::{SystemTime, UNIX_EPOCH}, +}; + +use atomic_shim::AtomicU64; + +use anyhow::anyhow; +use dashmap::DashMap; +use hmac::{Hmac, Mac as _}; +use rand::RngCore as _; +use sha2::Sha256; + +use crate::{ + common::PeerId, + peers::encrypt::{create_encryptor, Encryptor}, + tunnel::packet_def::{AesGcmTail, ZCPacket}, +}; + +type HmacSha256 = Hmac; +pub struct UpsertResponderSessionReturn { + pub session: Arc, + pub action: PeerSessionAction, + pub session_generation: u32, + pub root_key: Option<[u8; 32]>, + pub initial_epoch: u32, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PeerSessionAction { + Join, + Sync, + Create, +} + +#[derive(PartialEq, Clone, Eq, Hash)] +pub struct SessionKey { + network_name: String, + peer_id: PeerId, +} + +impl SessionKey { + pub fn new(network_name: String, peer_id: PeerId) -> Self { + Self { + network_name, + peer_id, + } + } +} + +#[derive(Clone)] +pub struct PeerSessionStore { + sessions: Arc>>, +} + +impl Default for PeerSessionStore { + fn default() -> Self { + Self { + sessions: Arc::new(DashMap::new()), + } + } +} + +impl PeerSessionStore { + pub fn new() -> Self { + Self::default() + } + + pub fn get(&self, key: &SessionKey) -> Option> { + self.sessions.get(key).map(|v| v.clone()) + } + + pub fn upsert_responder_session( + &self, + key: &SessionKey, + a_session_generation: Option, + send_algorithm: String, + recv_algorithm: String, + ) -> Result { + let existing = self.sessions.get(key).map(|v| v.clone()); + match existing { + None => { + let root_key = PeerSession::new_root_key(); + let session_generation = 1u32; + let initial_epoch = 0u32; + let session = Arc::new(PeerSession::new( + key.peer_id, + root_key, + session_generation, + initial_epoch, + send_algorithm, + recv_algorithm, + )); + self.sessions.insert(key.clone(), session.clone()); + Ok(UpsertResponderSessionReturn { + session, + action: PeerSessionAction::Create, + session_generation, + root_key: Some(root_key), + initial_epoch, + }) + } + Some(session) => { + session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?; + let local_gen = session.session_generation(); + if a_session_generation.is_some_and(|g| g == local_gen) { + Ok(UpsertResponderSessionReturn { + session, + action: PeerSessionAction::Join, + session_generation: local_gen, + root_key: None, + initial_epoch: 0, + }) + } else { + let initial_epoch = session.next_sync_epoch(); + let root_key = session.root_key(); + Ok(UpsertResponderSessionReturn { + session, + action: PeerSessionAction::Sync, + session_generation: local_gen, + root_key: Some(root_key), + initial_epoch, + }) + } + } + } + } + + #[allow(clippy::too_many_arguments)] + pub fn apply_initiator_action( + &self, + key: &SessionKey, + action: PeerSessionAction, + b_session_generation: u32, + root_key_32: Option<[u8; 32]>, + initial_epoch: u32, + send_algorithm: String, + recv_algorithm: String, + ) -> Result, anyhow::Error> { + tracing::info!( + "apply_initiator_action {:?}, send_algorithm: {}, recv_algorithm: {}", + action, + send_algorithm, + recv_algorithm + ); + match action { + PeerSessionAction::Join => { + let Some(session) = self.get(key) else { + return Err(anyhow!("no local session for JOIN")); + }; + session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?; + if session.session_generation() != b_session_generation { + return Err(anyhow!("JOIN generation mismatch")); + } + Ok(session) + } + PeerSessionAction::Sync | PeerSessionAction::Create => { + let root_key = root_key_32.ok_or_else(|| anyhow!("missing root_key"))?; + let session = self + .sessions + .entry(key.clone()) + .or_insert_with(|| { + Arc::new(PeerSession::new( + key.peer_id, + root_key, + b_session_generation, + initial_epoch, + send_algorithm.clone(), + recv_algorithm.clone(), + )) + }) + .clone(); + session.check_encrypt_algo_same(&send_algorithm, &recv_algorithm)?; + session.sync_root_key(root_key, b_session_generation, initial_epoch); + Ok(session) + } + } + } +} + +#[derive(Clone, Default)] +struct EpochKeySlot { + epoch: u32, + generation: u32, + valid: bool, + send_cipher: Option>, + recv_cipher: Option>, +} + +impl std::fmt::Debug for EpochKeySlot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EpochKeySlot") + .field("epoch", &self.epoch) + .field("generation", &self.generation) + .field("valid", &self.valid) + .finish() + } +} + +impl EpochKeySlot { + fn get_encryptor(&self, is_send: bool) -> Arc { + if is_send { + self.send_cipher.as_ref().unwrap().clone() + } else { + self.recv_cipher.as_ref().unwrap().clone() + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +struct ReplayWindow256 { + max_seq: u64, + bitmap: [u8; 32], + valid: bool, +} + +impl ReplayWindow256 { + fn clear(&mut self) { + self.max_seq = 0; + self.bitmap.fill(0); + self.valid = false; + } + + fn test_bit(&self, idx: usize) -> bool { + let byte = idx / 8; + let bit = idx % 8; + (self.bitmap[byte] >> bit) & 1 == 1 + } + + fn set_bit(&mut self, idx: usize) { + let byte = idx / 8; + let bit = idx % 8; + self.bitmap[byte] |= 1u8 << bit; + } + + fn shift_right(&mut self, shift: usize) { + if shift == 0 { + return; + } + let total_bits = 256usize; + if shift >= total_bits { + self.bitmap.fill(0); + return; + } + + let byte_shift = shift / 8; + let bit_shift = shift % 8; + + if byte_shift > 0 { + for i in (0..self.bitmap.len()).rev() { + self.bitmap[i] = if i >= byte_shift { + self.bitmap[i - byte_shift] + } else { + 0 + }; + } + } + + if bit_shift > 0 { + let mut carry = 0u8; + for b in self.bitmap.iter_mut().rev() { + let new_carry = *b << (8 - bit_shift); + *b = (*b >> bit_shift) | carry; + carry = new_carry; + } + } + } + + fn accept(&mut self, seq: u64) -> bool { + if !self.valid { + self.valid = true; + self.max_seq = seq; + self.set_bit(0); + return true; + } + + if seq > self.max_seq { + let shift = (seq - self.max_seq) as usize; + self.shift_right(shift); + self.max_seq = seq; + self.set_bit(0); + return true; + } + + let delta = (self.max_seq - seq) as usize; + if delta >= 256 { + return false; + } + if self.test_bit(delta) { + return false; + } + self.set_bit(delta); + true + } +} + +#[derive(Debug, Clone, Copy, Default)] +struct EpochRxSlot { + epoch: u32, + window: ReplayWindow256, + last_rx_ms: u64, + valid: bool, +} + +impl EpochRxSlot { + fn clear(&mut self) { + self.epoch = 0; + self.window.clear(); + self.last_rx_ms = 0; + self.valid = false; + } +} + +pub struct PeerSession { + peer_id: PeerId, + root_key: RwLock<[u8; 32]>, + session_generation: AtomicU32, + + send_epoch: AtomicU32, + send_seq: [AtomicU64; 2], + send_epoch_started_ms: AtomicU64, + send_packets_since_epoch: AtomicU64, + + rx_slots: Mutex<[[EpochRxSlot; 2]; 2]>, + key_cache: Mutex<[[EpochKeySlot; 2]; 2]>, + + send_cipher_algorithm: String, + recv_cipher_algorithm: String, +} + +impl std::fmt::Debug for PeerSession { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerSession") + .field("peer_id", &self.peer_id) + .field("root_key", &self.root_key) + .field("session_generation", &self.session_generation) + .field("send_epoch", &self.send_epoch) + .field("send_seq", &self.send_seq) + .field("send_epoch_started_ms", &self.send_epoch_started_ms) + .field("send_packets_since_epoch", &self.send_packets_since_epoch) + .field("rx_slots", &self.rx_slots) + .field("key_cache", &self.key_cache) + .field("send_cipher_algorithm", &self.send_cipher_algorithm) + .field("recv_cipher_algorithm", &self.recv_cipher_algorithm) + .finish() + } +} + +impl PeerSession { + /// Idle-eviction timeout for receive slots, in milliseconds. + /// + /// If no packets are received for this period (~30 seconds), the + /// corresponding RX slot is considered idle and may be cleared/reused. + /// This helps reclaim state for dead peers or paths while still tolerating + /// short network stalls. Environments with very bursty or high-latency + /// traffic may want to increase this value; low-latency or tightly + /// resource-constrained deployments may lower it. + const EVICT_IDLE_AFTER_MS: u64 = 30_000; + + /// Maximum number of packets to send in a single epoch before forcing + /// a key/epoch rotation. + /// + /// This bounds the amount of traffic protected under a single set of + /// derived keys, which is a common best practice for long-lived secure + /// channels. The current value (~1 million packets) is a conservative + /// default chosen to balance security (more frequent rotation) and + /// performance (avoiding excessive rekeying). Deployments with very high + /// or very low packet rates may tune this threshold accordingly. + const ROTATE_AFTER_PACKETS: u64 = 1_000_000; + + /// Maximum wall-clock lifetime of a send epoch, in milliseconds. + /// + /// Even if the packet-based limit is not reached, epochs are rotated + /// after this duration (~10 minutes) to avoid long-lived keys and keep + /// replay windows bounded in time. This also limits the impact of a + /// compromised key. Installations that prioritize lower overhead over + /// more aggressive key rotation may increase this value; those with + /// stricter security requirements may decrease it. + const ROTATE_AFTER_MS: u64 = 10 * 60 * 1000; + const MAX_ACCEPTED_RX_EPOCH_AHEAD: u32 = 3; + + pub fn new( + peer_id: PeerId, + root_key: [u8; 32], + session_generation: u32, + initial_epoch: u32, + send_cipher_algorithm: String, + recv_cipher_algorithm: String, + ) -> Self { + // let mut root_key_128 = [0u8; 16]; + // root_key_128.copy_from_slice(&root_key[..16]); + // let send_cipher = create_encryptor(&send_algorithm, root_key_128, root_key); + // let recv_cipher = create_encryptor(&recv_algorithm, root_key_128, root_key); + let rx_slots = [ + [EpochRxSlot::default(), EpochRxSlot::default()], + [EpochRxSlot::default(), EpochRxSlot::default()], + ]; + let key_cache = [ + [EpochKeySlot::default(), EpochKeySlot::default()], + [EpochKeySlot::default(), EpochKeySlot::default()], + ]; + let now_ms = now_ms(); + Self { + peer_id, + root_key: RwLock::new(root_key), + session_generation: AtomicU32::new(session_generation), + send_epoch: AtomicU32::new(initial_epoch), + send_seq: [AtomicU64::new(0), AtomicU64::new(0)], + send_epoch_started_ms: AtomicU64::new(now_ms), + send_packets_since_epoch: AtomicU64::new(0), + rx_slots: Mutex::new(rx_slots), + key_cache: Mutex::new(key_cache), + send_cipher_algorithm, + recv_cipher_algorithm, + } + } + + pub fn peer_id(&self) -> PeerId { + self.peer_id + } + + pub fn session_generation(&self) -> u32 { + self.session_generation.load(Ordering::Relaxed) + } + + pub fn root_key(&self) -> [u8; 32] { + *self.root_key.read().unwrap() + } + + pub fn new_root_key() -> [u8; 32] { + let mut out = [0u8; 32]; + rand::rngs::OsRng.fill_bytes(&mut out); + out + } + + pub fn next_sync_epoch(&self) -> u32 { + let send_epoch = self.send_epoch.load(Ordering::Relaxed); + let rx = self.rx_slots.lock().unwrap(); + let mut max_epoch = send_epoch; + for dir in 0..2 { + let cur = rx[dir][0]; + if cur.valid { + max_epoch = max_epoch.max(cur.epoch); + } + let prev = rx[dir][1]; + if prev.valid { + max_epoch = max_epoch.max(prev.epoch); + } + } + max_epoch.wrapping_add(1) + } + + pub fn check_encrypt_algo_same( + &self, + send_algorithm: &str, + recv_algorithm: &str, + ) -> Result<(), anyhow::Error> { + if self.send_cipher_algorithm != send_algorithm + || self.recv_cipher_algorithm != recv_algorithm + { + return Err(anyhow!("encrypt algorithm not same")); + } + Ok(()) + } + + pub fn sync_root_key(&self, root_key: [u8; 32], session_generation: u32, initial_epoch: u32) { + { + let mut g = self.root_key.write().unwrap(); + *g = root_key; + } + self.session_generation + .store(session_generation, Ordering::Relaxed); + + self.send_epoch.store(initial_epoch, Ordering::Relaxed); + self.send_seq[0].store(0, Ordering::Relaxed); + self.send_seq[1].store(0, Ordering::Relaxed); + self.send_epoch_started_ms + .store(now_ms(), Ordering::Relaxed); + self.send_packets_since_epoch.store(0, Ordering::Relaxed); + + { + let mut rx = self.rx_slots.lock().unwrap(); + for dir in 0..2 { + rx[dir][0] = EpochRxSlot { + epoch: initial_epoch, + window: ReplayWindow256::default(), + last_rx_ms: 0, + valid: true, + }; + rx[dir][1].clear(); + } + } + + self.key_cache + .lock() + .unwrap() + .fill([EpochKeySlot::default(), EpochKeySlot::default()]); + } + + pub fn dir_for_sender(sender_peer_id: PeerId, receiver_peer_id: PeerId) -> usize { + if sender_peer_id < receiver_peer_id { + 0 + } else { + 1 + } + } + + fn hkdf_traffic_key(&self, epoch: u32, dir: usize) -> [u8; 32] { + let root_key = self.root_key(); + let salt = [0u8; 32]; + let mut extract = HmacSha256::new_from_slice(&salt).unwrap(); + extract.update(&root_key); + let prk = extract.finalize().into_bytes(); + + let mut info = Vec::with_capacity(9 + 4 + 1); + info.extend_from_slice(b"et-traffic"); + info.extend_from_slice(&epoch.to_be_bytes()); + info.push(dir as u8); + + let mut expand = HmacSha256::new_from_slice(&prk).unwrap(); + expand.update(&info); + expand.update(&[1u8]); + let okm = expand.finalize().into_bytes(); + let mut key = [0u8; 32]; + key.copy_from_slice(&okm[..32]); + key + } + + fn get_encryptor(&self, epoch: u32, dir: usize, is_send: bool) -> Option> { + let generation = self.session_generation(); + let rx = self.rx_slots.lock().unwrap(); + let send_epoch = self.send_epoch.load(Ordering::Relaxed); + let allowed = epoch == send_epoch + || rx[dir][0].valid && rx[dir][0].epoch == epoch + || rx[dir][1].valid && rx[dir][1].epoch == epoch; + if !allowed { + return None; + } + + let mut guard = self.key_cache.lock().unwrap(); + for slot in guard[dir].iter_mut() { + if slot.valid && slot.epoch == epoch && slot.generation == generation { + return Some(slot.get_encryptor(is_send)); + } + } + + let key = self.hkdf_traffic_key(epoch, dir); + let mut key_128 = [0u8; 16]; + key_128.copy_from_slice(&key[..16]); + + let slot = EpochKeySlot { + epoch, + generation, + valid: true, + send_cipher: Some(create_encryptor(&self.send_cipher_algorithm, key_128, key)), + recv_cipher: Some(create_encryptor(&self.recv_cipher_algorithm, key_128, key)), + }; + let ret = slot.get_encryptor(is_send); + + if !guard[dir][0].valid || guard[dir][0].epoch == epoch { + guard[dir][0] = slot; + } else { + guard[dir][1] = slot; + } + + Some(ret) + } + + fn maybe_rotate_epoch(&self, now_ms: u64) { + let packets = self + .send_packets_since_epoch + .fetch_add(1, Ordering::Relaxed) + + 1; + let started = self.send_epoch_started_ms.load(Ordering::Relaxed); + if packets < Self::ROTATE_AFTER_PACKETS + && now_ms.saturating_sub(started) < Self::ROTATE_AFTER_MS + { + return; + } + + let cur = self.send_epoch.load(Ordering::Relaxed); + let next = cur.wrapping_add(1); + if self + .send_epoch + .compare_exchange(cur, next, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.send_epoch_started_ms.store(now_ms, Ordering::Relaxed); + self.send_packets_since_epoch.store(0, Ordering::Relaxed); + } + } + + fn next_nonce(&self, dir: usize) -> (u32, u64, [u8; 12]) { + let now_ms = now_ms(); + self.maybe_rotate_epoch(now_ms); + let epoch = self.send_epoch.load(Ordering::Relaxed); + let seq = self.send_seq[dir].fetch_add(1, Ordering::Relaxed); + let mut nonce = [0u8; 12]; + nonce[..4].copy_from_slice(&epoch.to_be_bytes()); + nonce[4..].copy_from_slice(&seq.to_be_bytes()); + (epoch, seq, nonce) + } + + fn parse_tail(payload: &[u8]) -> Option<[u8; 12]> { + if payload.len() < std::mem::size_of::() { + return None; + } + let tail_off = payload.len() - std::mem::size_of::(); + let tail = &payload[tail_off..]; + let mut nonce = [0u8; 12]; + nonce.copy_from_slice(&tail[16..]); + Some(nonce) + } + + fn evict_old_rx_slots(rx: &mut [[EpochRxSlot; 2]; 2], now_ms: u64) { + for dir_slots in rx.iter_mut() { + for slot in dir_slots.iter_mut() { + if !slot.valid { + continue; + } + let last = slot.last_rx_ms; + if last != 0 && now_ms.saturating_sub(last) > Self::EVICT_IDLE_AFTER_MS { + slot.clear(); + } + } + } + } + + fn check_replay(&self, epoch: u32, seq: u64, dir: usize, now_ms: u64) -> bool { + let mut rx = self.rx_slots.lock().unwrap(); + Self::evict_old_rx_slots(&mut rx, now_ms); + let send_epoch = self.send_epoch.load(Ordering::Relaxed); + { + let mut key_cache = self.key_cache.lock().unwrap(); + for d in 0..2 { + for s in 0..2 { + if !key_cache[d][s].valid { + continue; + } + let e = key_cache[d][s].epoch; + let allowed = e == send_epoch + || rx[d][0].valid && rx[d][0].epoch == e + || rx[d][1].valid && rx[d][1].epoch == e; + if !allowed { + key_cache[d][s].valid = false; + } + } + } + } + + if !rx[dir][0].valid { + rx[dir][0] = EpochRxSlot { + epoch, + window: ReplayWindow256::default(), + last_rx_ms: now_ms, + valid: true, + }; + } + + if rx[dir][0].valid && epoch == rx[dir][0].epoch { + rx[dir][0].last_rx_ms = now_ms; + return rx[dir][0].window.accept(seq); + } + + if rx[dir][1].valid && epoch == rx[dir][1].epoch { + rx[dir][1].last_rx_ms = now_ms; + return rx[dir][1].window.accept(seq); + } + + if rx[dir][0].valid && epoch > rx[dir][0].epoch { + let mut baseline_epoch = send_epoch; + if rx[dir][0].valid { + baseline_epoch = baseline_epoch.max(rx[dir][0].epoch); + } + if rx[dir][1].valid { + baseline_epoch = baseline_epoch.max(rx[dir][1].epoch); + } + let max_allowed_epoch = + baseline_epoch.saturating_add(Self::MAX_ACCEPTED_RX_EPOCH_AHEAD); + if epoch > max_allowed_epoch { + return false; + } + + rx[dir][1] = rx[dir][0]; + rx[dir][0] = EpochRxSlot { + epoch, + window: ReplayWindow256::default(), + last_rx_ms: now_ms, + valid: true, + }; + return rx[dir][0].window.accept(seq); + } + + false + } + + pub fn encrypt_payload( + &self, + sender_peer_id: PeerId, + receiver_peer_id: PeerId, + pkt: &mut ZCPacket, + ) -> Result<(), anyhow::Error> { + let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id); + let (epoch, _seq, nonce_bytes) = self.next_nonce(dir); + let encryptor = self + .get_encryptor(epoch, dir, true) + .ok_or_else(|| anyhow!("no key for epoch"))?; + let _ = encryptor.encrypt_with_nonce(pkt, Some(nonce_bytes.as_slice())); + Ok(()) + } + + pub fn decrypt_payload( + &self, + sender_peer_id: PeerId, + receiver_peer_id: PeerId, + ciphertext_with_tail: &mut ZCPacket, + ) -> Result<(), anyhow::Error> { + let dir = Self::dir_for_sender(sender_peer_id, receiver_peer_id); + let nonce_bytes = + Self::parse_tail(ciphertext_with_tail.payload()).ok_or_else(|| anyhow!("no tail"))?; + let epoch = u32::from_be_bytes(nonce_bytes[..4].try_into().unwrap()); + let seq = u64::from_be_bytes(nonce_bytes[4..].try_into().unwrap()); + + let now_ms = now_ms(); + if !self.check_replay(epoch, seq, dir, now_ms) { + return Err(anyhow!("replay rejected")); + } + + let encryptor = self + .get_encryptor(epoch, dir, false) + .ok_or_else(|| anyhow!("no key for epoch"))?; + encryptor.decrypt(ciphertext_with_tail)?; + + Ok(()) + } +} + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn peer_session_supports_asymmetric_algorithms() { + let a: PeerId = 10; + let b: PeerId = 20; + let root_key = PeerSession::new_root_key(); + let generation = 1u32; + let initial_epoch = 0u32; + + let sa = PeerSession::new( + b, + root_key, + generation, + initial_epoch, + "aes-256-gcm".to_string(), + "chacha20-poly1305".to_string(), + ); + let sb = PeerSession::new( + a, + root_key, + generation, + initial_epoch, + "chacha20-poly1305".to_string(), + "aes-256-gcm".to_string(), + ); + + let plaintext1 = b"hello from a"; + let mut pkt1 = ZCPacket::new_with_payload(plaintext1); + pkt1.fill_peer_manager_hdr(a as u32, b as u32, 0); + sa.encrypt_payload(a, b, &mut pkt1).unwrap(); + sb.decrypt_payload(a, b, &mut pkt1).unwrap(); + assert_eq!(pkt1.payload(), plaintext1); + + let plaintext2 = b"hello from b"; + let mut pkt2 = ZCPacket::new_with_payload(plaintext2); + pkt2.fill_peer_manager_hdr(b as u32, a as u32, 0); + sb.encrypt_payload(b, a, &mut pkt2).unwrap(); + sa.decrypt_payload(b, a, &mut pkt2).unwrap(); + assert_eq!(pkt2.payload(), plaintext2); + } + + #[test] + fn replay_rejects_far_future_epoch_without_poisoning_window() { + let peer_id: PeerId = 10; + let root_key = PeerSession::new_root_key(); + let generation = 1u32; + let initial_epoch = 0u32; + let s = PeerSession::new( + peer_id, + root_key, + generation, + initial_epoch, + "aes-256-gcm".to_string(), + "aes-256-gcm".to_string(), + ); + + let now = now_ms(); + + assert!(s.check_replay(0, 1, 0, now)); + assert!(s.check_replay(0, 2, 0, now)); + + assert!(!s.check_replay(1000, 1, 0, now)); + + assert!(s.check_replay(1, 1, 0, now + 1)); + assert!(s.check_replay(1, 2, 0, now + 2)); + } +} diff --git a/easytier/src/proto/api_instance.proto b/easytier/src/proto/api_instance.proto index 2afc2c5d..e6947cc8 100644 --- a/easytier/src/proto/api_instance.proto +++ b/easytier/src/proto/api_instance.proto @@ -41,6 +41,9 @@ message PeerConnInfo { bool is_client = 8; string network_name = 9; bool is_closed = 10; + bytes noise_local_static_pubkey = 11; + bytes noise_remote_static_pubkey = 12; + peer_rpc.SecureAuthLevel secure_auth_level = 13; } message PeerInfo { diff --git a/easytier/src/proto/api_manage.proto b/easytier/src/proto/api_manage.proto index befd88a7..de52ebc1 100644 --- a/easytier/src/proto/api_manage.proto +++ b/easytier/src/proto/api_manage.proto @@ -81,6 +81,8 @@ message NetworkConfig { optional common.CompressionAlgoPb data_compress_algo = 52; optional string encryption_algorithm = 53; optional bool disable_tcp_hole_punching = 54; + + common.SecureModeConfig secure_mode = 55; } message PortForwardConfig { diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 96e89d0b..9bbcb7d6 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -230,3 +230,13 @@ message LimiterConfig { optional uint64 fill_duration_ms = 3; // default 10ms, the period to fill the bucket } + +message SecureModeConfig { + bool enabled = 1; + + // base64(X25519 private key), used by shared node to present a stable identity + optional string local_private_key = 2; + + // base64(X25519 public key), required if local_private_key is set + optional string local_public_key = 3; +} diff --git a/easytier/src/proto/common.rs b/easytier/src/proto/common.rs index 5a153c11..e655c0c5 100644 --- a/easytier/src/proto/common.rs +++ b/easytier/src/proto/common.rs @@ -4,6 +4,7 @@ use std::{ }; use anyhow::Context; +use base64::{prelude::BASE64_STANDARD, Engine as _}; use crate::tunnel::packet_def::CompressorAlgo; @@ -360,3 +361,37 @@ impl fmt::Debug for Ipv6Addr { write!(f, "{}", std_ipv6_addr) } } + +impl SecureModeConfig { + pub fn private_key(&self) -> anyhow::Result { + let local_private_key = self + .local_private_key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("local private key is not set"))?; + let k = BASE64_STANDARD + .decode(local_private_key) + .with_context(|| format!("failed to decode private key: {}", local_private_key))?; + // convert vec to 32b array + let len = k.len(); + let k: [u8; 32] = k + .try_into() + .map_err(|_| anyhow::anyhow!("invalid private key length: {}", len))?; + Ok(x25519_dalek::StaticSecret::from(k)) + } + + pub fn public_key(&self) -> anyhow::Result { + let local_public_key = self + .local_public_key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("local public key is not set"))?; + let k = BASE64_STANDARD + .decode(local_public_key) + .with_context(|| format!("failed to decode public key: {}", local_public_key))?; + // convert vec to 32b array + let len = k.len(); + let k: [u8; 32] = k + .try_into() + .map_err(|_| anyhow::anyhow!("invalid public key length: {}", len))?; + Ok(x25519_dalek::PublicKey::from(k)) + } +} diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 3376a63f..8d162560 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -251,10 +251,51 @@ message HandshakeRequest { uint32 version = 3; repeated string features = 4; string network_name = 5; - bytes network_secret_digrest = 6; + bytes network_secret_digest = 6; } message KcpConnData { common.SocketAddr src = 1; common.SocketAddr dst = 4; } + +enum SecureAuthLevel { + None = 0; + EncryptedUnauthenticated = 1; + SharedNodePubkeyVerified = 2; + NetworkSecretConfirmed = 3; +} + +enum PeerConnSessionActionPb { + Join = 0; + Sync = 1; + Create = 2; +} + +message PeerConnNoiseMsg1Pb { + uint32 version = 1; + string a_network_name = 2; + optional uint32 a_session_generation = 3; + common.UUID a_conn_id = 4; + string client_encryption_algorithm = 5; +} + +message PeerConnNoiseMsg2Pb { + string b_network_name = 1; + uint32 role_hint = 2; + PeerConnSessionActionPb action = 3; + uint32 b_session_generation = 4; + optional bytes root_key_32 = 5; + uint32 initial_epoch = 6; + common.UUID b_conn_id = 7; + common.UUID a_conn_id_echo = 8; + optional bytes secret_proof_32 = 9; + string server_encryption_algorithm = 10; +} + +message PeerConnNoiseMsg3Pb { + common.UUID a_conn_id_echo = 1; + common.UUID b_conn_id_echo = 2; + optional bytes secret_proof_32 = 3; + bytes secret_digest = 4; +} diff --git a/easytier/src/tunnel/filter.rs b/easytier/src/tunnel/filter.rs index 2d1cf7f0..929f3537 100644 --- a/easytier/src/tunnel/filter.rs +++ b/easytier/src/tunnel/filter.rs @@ -220,14 +220,14 @@ impl TunnelFilter for PacketRecorderTunnelFilter { type FilterOutput = (Vec, Vec); fn before_send(&self, data: SinkItem) -> Option { - self.received.lock().unwrap().push(data.clone()); + self.sent.lock().unwrap().push(data.clone()); Some(data) } fn after_received(&self, data: StreamItem) -> Option { match data { Ok(v) => { - self.sent.lock().unwrap().push(v.clone()); + self.received.lock().unwrap().push(v.clone()); Some(Ok(v)) } Err(e) => Some(Err(e)), @@ -236,8 +236,8 @@ impl TunnelFilter for PacketRecorderTunnelFilter { fn filter_output(&self) -> Self::FilterOutput { ( - self.received.lock().unwrap().clone(), self.sent.lock().unwrap().clone(), + self.received.lock().unwrap().clone(), ) } } diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index 4be1733b..095eb321 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -56,7 +56,7 @@ pub struct WGTunnelHeader { } pub const WG_TUNNEL_HEADER_SIZE: usize = std::mem::size_of::(); -#[derive(AsBytes, FromZeroes, Clone, Debug)] +#[derive(AsBytes, FromZeroes, Copy, Clone, Debug)] #[repr(u8)] pub enum PacketType { Invalid = 0, @@ -72,6 +72,9 @@ pub enum PacketType { ForeignNetworkPacket = 10, KcpSrc = 11, KcpDst = 12, + NoiseHandshakeMsg1 = 13, + NoiseHandshakeMsg2 = 14, + NoiseHandshakeMsg3 = 15, } bitflags::bitflags! {