Files
telemt/src/transport/middle_proxy/pool.rs
T

1005 lines
37 KiB
Rust
Raw Normal View History

use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2026-02-14 01:36:14 +03:00
use std::sync::Arc;
2026-02-24 00:04:12 +03:00
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, AtomicUsize, Ordering};
2026-02-14 01:36:14 +03:00
use bytes::BytesMut;
2026-02-14 04:19:44 +03:00
use rand::Rng;
2026-02-15 14:02:00 +03:00
use rand::seq::SliceRandom;
2026-02-19 13:35:56 +03:00
use tokio::sync::{Mutex, RwLock, mpsc, Notify};
2026-02-17 03:40:39 +03:00
use tokio_util::sync::CancellationToken;
2026-02-14 01:36:14 +03:00
use tracing::{debug, info, warn};
2026-02-24 00:04:12 +03:00
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2026-02-14 01:36:14 +03:00
2026-02-16 12:10:59 +03:00
use crate::crypto::SecureRandom;
2026-02-14 01:36:14 +03:00
use crate::error::{ProxyError, Result};
2026-02-18 06:01:52 +03:00
use crate::network::probe::NetworkDecision;
use crate::network::IpFamily;
2026-02-14 01:36:14 +03:00
use crate::protocol::constants::*;
use super::ConnRegistry;
2026-02-23 02:28:00 +03:00
use super::registry::BoundConn;
2026-02-19 13:35:56 +03:00
use super::codec::{RpcWriter, WriterCommand};
2026-02-14 01:36:14 +03:00
use super::reader::reader_loop;
2026-02-14 01:51:10 +03:00
const ME_ACTIVE_PING_SECS: u64 = 25;
2026-02-14 04:19:44 +03:00
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
2026-02-14 01:36:14 +03:00
2026-02-17 03:40:39 +03:00
#[derive(Clone)]
pub struct MeWriter {
pub id: u64,
pub addr: SocketAddr,
2026-02-24 00:04:12 +03:00
pub generation: u64,
2026-02-19 13:35:56 +03:00
pub tx: mpsc::Sender<WriterCommand>,
2026-02-17 03:40:39 +03:00
pub cancel: CancellationToken,
pub degraded: Arc<AtomicBool>,
pub draining: Arc<AtomicBool>,
2026-02-24 00:04:12 +03:00
pub draining_started_at_epoch_secs: Arc<AtomicU64>,
pub allow_drain_fallback: Arc<AtomicBool>,
2026-02-17 03:40:39 +03:00
}
2026-02-24 03:40:59 +03:00
#[allow(dead_code)]
2026-02-14 01:36:14 +03:00
pub struct MePool {
2026-02-14 01:51:10 +03:00
pub(super) registry: Arc<ConnRegistry>,
2026-02-17 03:40:39 +03:00
pub(super) writers: Arc<RwLock<Vec<MeWriter>>>,
2026-02-14 01:51:10 +03:00
pub(super) rr: AtomicU64,
2026-02-18 06:01:52 +03:00
pub(super) decision: NetworkDecision,
pub(super) rng: Arc<SecureRandom>,
2026-02-14 01:51:10 +03:00
pub(super) proxy_tag: Option<Vec<u8>>,
2026-02-15 13:14:50 +03:00
pub(super) proxy_secret: Arc<RwLock<Vec<u8>>>,
2026-02-14 04:19:44 +03:00
pub(super) nat_ip_cfg: Option<IpAddr>,
2026-02-15 13:14:50 +03:00
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
2026-02-14 12:44:20 +03:00
pub(super) nat_probe: bool,
pub(super) nat_stun: Option<String>,
2026-02-19 13:35:56 +03:00
pub(super) nat_stun_servers: Vec<String>,
pub(super) detected_ipv6: Option<Ipv6Addr>,
2026-02-18 19:49:19 +03:00
pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8,
pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool,
2026-02-19 13:35:56 +03:00
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
2026-02-18 19:50:16 +03:00
pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration,
2026-02-19 15:39:30 +03:00
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
pub(super) me_reconnect_max_concurrent_per_dc: u32,
pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32,
2026-02-15 13:14:50 +03:00
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
2026-02-17 03:40:39 +03:00
pub(super) next_writer_id: AtomicU64,
pub(super) ping_tracker: Arc<Mutex<HashMap<i64, (std::time::Instant, u64)>>>,
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
2026-02-18 06:01:52 +03:00
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
2026-02-19 13:35:56 +03:00
pub(super) writer_available: Arc<Notify>,
pub(super) conn_count: AtomicUsize,
2026-02-19 15:49:35 +03:00
pub(super) stats: Arc<crate::stats::Stats>,
2026-02-24 00:04:12 +03:00
pub(super) generation: AtomicU64,
pub(super) hardswap: AtomicBool,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
2026-02-14 01:36:14 +03:00
pool_size: usize,
}
2026-02-18 06:01:52 +03:00
#[derive(Debug, Default)]
pub struct NatReflectionCache {
pub v4: Option<(std::time::Instant, std::net::SocketAddr)>,
pub v6: Option<(std::time::Instant, std::net::SocketAddr)>,
}
2026-02-14 01:36:14 +03:00
impl MePool {
2026-02-24 00:04:12 +03:00
fn ratio_to_permille(ratio: f32) -> u32 {
let clamped = ratio.clamp(0.0, 1.0);
(clamped * 1000.0).round() as u32
}
fn permille_to_ratio(permille: u32) -> f32 {
(permille.min(1000) as f32) / 1000.0
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
2026-02-14 01:36:14 +03:00
pub fn new(
proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>,
nat_ip: Option<IpAddr>,
2026-02-14 12:44:20 +03:00
nat_probe: bool,
nat_stun: Option<String>,
2026-02-19 13:35:56 +03:00
nat_stun_servers: Vec<String>,
detected_ipv6: Option<Ipv6Addr>,
2026-02-18 19:50:16 +03:00
me_one_retry: u8,
me_one_timeout_ms: u64,
2026-02-15 13:14:50 +03:00
proxy_map_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
proxy_map_v6: HashMap<i32, Vec<(IpAddr, u16)>>,
default_dc: Option<i32>,
2026-02-18 06:01:52 +03:00
decision: NetworkDecision,
rng: Arc<SecureRandom>,
2026-02-19 15:49:35 +03:00
stats: Arc<crate::stats::Stats>,
2026-02-19 15:39:30 +03:00
me_keepalive_enabled: bool,
me_keepalive_interval_secs: u64,
me_keepalive_jitter_secs: u64,
me_keepalive_payload_random: bool,
me_warmup_stagger_enabled: bool,
me_warmup_step_delay_ms: u64,
me_warmup_step_jitter_ms: u64,
me_reconnect_max_concurrent_per_dc: u32,
me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32,
2026-02-24 00:04:12 +03:00
hardswap: bool,
me_pool_drain_ttl_secs: u64,
me_pool_force_close_secs: u64,
me_pool_min_fresh_ratio: f32,
2026-02-14 01:36:14 +03:00
) -> Arc<Self> {
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
writers: Arc::new(RwLock::new(Vec::new())),
rr: AtomicU64::new(0),
2026-02-18 06:01:52 +03:00
decision,
rng,
2026-02-14 01:36:14 +03:00
proxy_tag,
2026-02-15 13:14:50 +03:00
proxy_secret: Arc::new(RwLock::new(proxy_secret)),
2026-02-14 04:19:44 +03:00
nat_ip_cfg: nat_ip,
2026-02-15 13:14:50 +03:00
nat_ip_detected: Arc::new(RwLock::new(None)),
2026-02-14 12:44:20 +03:00
nat_probe,
nat_stun,
2026-02-19 13:35:56 +03:00
nat_stun_servers,
detected_ipv6,
2026-02-18 19:49:19 +03:00
nat_probe_attempts: std::sync::atomic::AtomicU8::new(0),
nat_probe_disabled: std::sync::atomic::AtomicBool::new(false),
2026-02-19 13:35:56 +03:00
stun_backoff_until: Arc::new(RwLock::new(None)),
2026-02-18 19:50:16 +03:00
me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
2026-02-19 15:49:35 +03:00
stats,
2026-02-19 15:39:30 +03:00
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count,
2026-02-14 01:36:14 +03:00
pool_size: 2,
2026-02-15 13:14:50 +03:00
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
default_dc: AtomicI32::new(default_dc.unwrap_or(0)),
2026-02-17 03:40:39 +03:00
next_writer_id: AtomicU64::new(1),
ping_tracker: Arc::new(Mutex::new(HashMap::new())),
rtt_stats: Arc::new(Mutex::new(HashMap::new())),
2026-02-18 06:01:52 +03:00
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
2026-02-19 13:35:56 +03:00
writer_available: Arc::new(Notify::new()),
conn_count: AtomicUsize::new(0),
2026-02-24 00:04:12 +03:00
generation: AtomicU64::new(1),
hardswap: AtomicBool::new(hardswap),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(me_pool_min_fresh_ratio)),
2026-02-14 01:36:14 +03:00
})
}
pub fn has_proxy_tag(&self) -> bool {
self.proxy_tag.is_some()
}
2026-02-24 00:04:12 +03:00
pub fn current_generation(&self) -> u64 {
self.generation.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy(
&self,
hardswap: bool,
drain_ttl_secs: u64,
force_close_secs: u64,
min_fresh_ratio: f32,
) {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs.store(drain_ttl_secs, Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
}
2026-02-19 13:35:56 +03:00
pub fn reset_stun_state(&self) {
self.nat_probe_attempts.store(0, Ordering::Relaxed);
self.nat_probe_disabled.store(false, Ordering::Relaxed);
}
2026-02-14 01:36:14 +03:00
pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr {
let ip = self.translate_ip_for_nat(addr.ip());
SocketAddr::new(ip, addr.port())
}
pub fn registry(&self) -> &Arc<ConnRegistry> {
&self.registry
}
2026-02-17 03:40:39 +03:00
fn writers_arc(&self) -> Arc<RwLock<Vec<MeWriter>>> {
2026-02-14 01:36:14 +03:00
self.writers.clone()
}
2026-02-24 00:04:12 +03:00
fn force_close_timeout(&self) -> Option<Duration> {
let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
}
fn coverage_ratio(
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
active_writer_addrs: &HashSet<SocketAddr>,
) -> (f32, Vec<i32>) {
if desired_by_dc.is_empty() {
return (1.0, Vec::new());
}
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
if endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) {
covered += 1;
} else {
missing_dc.push(*dc);
}
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
2026-02-17 03:40:39 +03:00
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
2026-02-15 14:02:00 +03:00
let writers = self.writers.read().await;
2026-02-19 13:35:56 +03:00
let current: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
2026-02-15 14:02:00 +03:00
drop(writers);
2026-02-18 06:01:52 +03:00
for family in self.family_order() {
let map = self.proxy_map_for_family(family).await;
for (_dc, addrs) in map.iter() {
let dc_addrs: Vec<SocketAddr> = addrs
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect();
if !dc_addrs.iter().any(|a| current.contains(a)) {
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
if self.connect_one(addr, rng).await.is_ok() {
break;
}
2026-02-15 14:02:00 +03:00
}
}
}
2026-02-18 06:01:52 +03:00
if !self.decision.effective_multipath && !current.is_empty() {
break;
}
2026-02-15 14:02:00 +03:00
}
}
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
out
}
2026-02-24 00:04:12 +03:00
async fn warmup_generation_for_all_dcs(
self: &Arc<Self>,
rng: &SecureRandom,
2026-02-24 00:04:12 +03:00
generation: u64,
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
) {
2026-02-24 00:04:12 +03:00
for endpoints in desired_by_dc.values() {
if endpoints.is_empty() {
continue;
}
let has_fresh = {
let ws = self.writers.read().await;
ws.iter().any(|w| {
!w.draining.load(Ordering::Relaxed)
&& w.generation == generation
&& endpoints.contains(&w.addr)
})
};
if has_fresh {
continue;
}
2026-02-24 00:04:12 +03:00
let mut shuffled: Vec<SocketAddr> = endpoints.iter().copied().collect();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
if self.connect_one(addr, rng).await.is_ok() {
break;
}
}
}
}
pub async fn zero_downtime_reinit_after_map_change(
self: &Arc<Self>,
rng: &SecureRandom,
) {
let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty after update; skipping stale writer drain");
return;
}
2026-02-24 00:04:12 +03:00
let previous_generation = self.current_generation();
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
let hardswap = self.hardswap.load(Ordering::Relaxed);
if hardswap {
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
.await;
} else {
self.reconcile_connections(rng).await;
}
let writers = self.writers.read().await;
let active_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
2026-02-24 00:04:12 +03:00
let min_ratio = Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed),
);
let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
if !hardswap && coverage_ratio < min_ratio {
warn!(
previous_generation,
generation,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
missing_dc = ?missing_dc,
"ME reinit coverage below threshold; keeping stale writers"
);
return;
}
2026-02-24 00:04:12 +03:00
if hardswap {
let fresh_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.map(|w| w.addr)
.collect();
let (fresh_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_ratio = format_args!("{fresh_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
);
return;
}
} else if !missing_dc.is_empty() {
warn!(
missing_dc = ?missing_dc,
2026-02-23 16:09:09 +03:00
// Keep stale writers alive when fresh coverage is incomplete.
"ME reinit coverage incomplete after map update; keeping stale writers"
);
return;
}
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
.values()
.flat_map(|set| set.iter().copied())
.collect();
let stale_writer_ids: Vec<u64> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
2026-02-24 00:04:12 +03:00
.filter(|w| {
if hardswap {
w.generation < generation
} else {
!desired_addrs.contains(&w.addr)
}
})
.map(|w| w.id)
.collect();
drop(writers);
if stale_writer_ids.is_empty() {
debug!("ME map update completed with no stale writers");
return;
}
2026-02-24 00:04:12 +03:00
let drain_timeout = self.force_close_timeout();
let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0);
info!(
stale_writers = stale_writer_ids.len(),
2026-02-24 00:04:12 +03:00
previous_generation,
generation,
hardswap,
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
drain_timeout_secs,
"ME map update covered; draining stale writers"
);
2026-02-24 00:04:12 +03:00
self.stats.increment_pool_swap_total();
for writer_id in stale_writer_ids {
2026-02-24 00:04:12 +03:00
self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap)
.await;
}
}
2026-02-15 13:14:50 +03:00
pub async fn update_proxy_maps(
&self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>,
) -> bool {
let mut changed = false;
{
let mut guard = self.proxy_map_v4.write().await;
if !new_v4.is_empty() && *guard != new_v4 {
*guard = new_v4;
changed = true;
}
}
if let Some(v6) = new_v6 {
let mut guard = self.proxy_map_v6.write().await;
if !v6.is_empty() && *guard != v6 {
*guard = v6;
2026-02-19 13:35:56 +03:00
changed = true;
}
}
// Ensure negative DC entries mirror positives when absent (Telegram convention).
{
let mut guard = self.proxy_map_v4.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
2026-02-24 05:57:53 +03:00
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
2026-02-19 13:35:56 +03:00
}
}
}
{
let mut guard = self.proxy_map_v6.write().await;
let keys: Vec<i32> = guard.keys().cloned().collect();
for k in keys.iter().cloned().filter(|k| *k > 0) {
2026-02-24 05:57:53 +03:00
if !guard.contains_key(&-k)
&& let Some(addrs) = guard.get(&k).cloned()
{
guard.insert(-k, addrs);
2026-02-19 13:35:56 +03:00
}
2026-02-15 13:14:50 +03:00
}
}
changed
}
2026-02-19 13:35:56 +03:00
pub async fn update_secret(self: &Arc<Self>, new_secret: Vec<u8>) -> bool {
2026-02-15 13:14:50 +03:00
if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
return false;
}
let mut guard = self.proxy_secret.write().await;
if *guard != new_secret {
*guard = new_secret;
drop(guard);
self.reconnect_all().await;
return true;
}
false
}
2026-02-19 13:35:56 +03:00
pub async fn reconnect_all(self: &Arc<Self>) {
let ws = self.writers.read().await.clone();
for w in ws {
if let Ok(()) = self.connect_one(w.addr, self.rng.as_ref()).await {
self.mark_writer_draining(w.id).await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
2026-02-15 13:14:50 +03:00
}
2026-02-16 12:10:59 +03:00
pub(super) async fn key_selector(&self) -> u32 {
2026-02-15 13:14:50 +03:00
let secret = self.proxy_secret.read().await;
if secret.len() >= 4 {
u32::from_le_bytes([secret[0], secret[1], secret[2], secret[3]])
2026-02-14 01:36:14 +03:00
} else {
0
}
}
2026-02-18 06:01:52 +03:00
pub(super) fn family_order(&self) -> Vec<IpFamily> {
let mut order = Vec::new();
if self.decision.prefer_ipv6() {
if self.decision.ipv6_me {
order.push(IpFamily::V6);
}
if self.decision.ipv4_me {
order.push(IpFamily::V4);
}
} else {
if self.decision.ipv4_me {
order.push(IpFamily::V4);
}
if self.decision.ipv6_me {
order.push(IpFamily::V6);
}
}
order
}
async fn proxy_map_for_family(&self, family: IpFamily) -> HashMap<i32, Vec<(IpAddr, u16)>> {
match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
}
}
2026-02-17 03:40:39 +03:00
pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &Arc<SecureRandom>) -> Result<()> {
2026-02-18 06:01:52 +03:00
let family_order = self.family_order();
2026-02-15 13:14:50 +03:00
let ks = self.key_selector().await;
2026-02-14 01:36:14 +03:00
info!(
2026-02-18 06:01:52 +03:00
me_servers = self.proxy_map_v4.read().await.len(),
2026-02-14 01:36:14 +03:00
pool_size,
key_selector = format_args!("0x{ks:08x}"),
2026-02-15 13:14:50 +03:00
secret_len = self.proxy_secret.read().await.len(),
2026-02-14 01:36:14 +03:00
"Initializing ME pool"
);
2026-02-18 06:01:52 +03:00
for family in family_order {
let map = self.proxy_map_for_family(family).await;
let dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = map
.iter()
.map(|(dc, addrs)| (*dc, addrs.clone()))
.collect();
// Ensure at least one connection per DC; run DCs in parallel.
let mut join = tokio::task::JoinSet::new();
2026-02-18 19:50:16 +03:00
let mut dc_failures = 0usize;
2026-02-18 06:01:52 +03:00
for (dc, addrs) in dc_addrs.iter().cloned() {
if addrs.is_empty() {
continue;
}
let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng);
join.spawn(async move {
2026-02-18 19:50:16 +03:00
pool.connect_primary_for_dc(dc, addrs, rng_clone).await
2026-02-18 06:01:52 +03:00
});
2026-02-15 13:14:50 +03:00
}
2026-02-18 19:50:16 +03:00
while let Some(res) = join.join_next().await {
if let Ok(false) = res {
dc_failures += 1;
}
}
if dc_failures > 2 {
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
}
2026-02-15 13:14:50 +03:00
2026-02-19 15:39:30 +03:00
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
if self.me_warmup_stagger_enabled {
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
2026-02-22 23:53:10 +03:00
let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter;
2026-02-19 15:39:30 +03:00
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
2026-02-18 06:01:52 +03:00
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed");
}
}
2026-02-15 13:14:50 +03:00
if self.connection_count() >= pool_size {
break;
}
2026-02-14 01:36:14 +03:00
}
2026-02-19 15:39:30 +03:00
}
2026-02-18 06:01:52 +03:00
if !self.decision.effective_multipath && self.connection_count() > 0 {
2026-02-14 01:36:14 +03:00
break;
}
}
if self.writers.read().await.is_empty() {
return Err(ProxyError::Proxy("No ME connections".into()));
}
Ok(())
}
2026-02-17 03:40:39 +03:00
pub(crate) async fn connect_one(self: &Arc<Self>, addr: SocketAddr, rng: &SecureRandom) -> Result<()> {
2026-02-16 12:10:59 +03:00
let secret_len = self.proxy_secret.read().await.len();
if secret_len < 32 {
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
2026-02-14 01:36:14 +03:00
}
2026-02-16 12:10:59 +03:00
let (stream, _connect_ms) = self.connect_tcp(addr).await?;
let hs = self.handshake_only(stream, addr, rng).await?;
2026-02-14 01:36:14 +03:00
2026-02-17 03:40:39 +03:00
let writer_id = self.next_writer_id.fetch_add(1, Ordering::Relaxed);
2026-02-24 00:04:12 +03:00
let generation = self.current_generation();
2026-02-17 03:40:39 +03:00
let cancel = CancellationToken::new();
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
2026-02-24 00:04:12 +03:00
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
2026-02-19 13:35:56 +03:00
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let mut rpc_writer = RpcWriter {
2026-02-16 12:10:59 +03:00
writer: hs.wr,
key: hs.write_key,
iv: hs.write_iv,
2026-02-14 01:36:14 +03:00
seq_no: 0,
2026-02-23 02:28:00 +03:00
crc_mode: hs.crc_mode,
2026-02-19 13:35:56 +03:00
};
let cancel_wr = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
cmd = rx.recv() => {
match cmd {
Some(WriterCommand::Data(payload)) => {
if rpc_writer.send(&payload).await.is_err() { break; }
}
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Close) | None => break,
}
}
_ = cancel_wr.cancelled() => break,
}
}
});
2026-02-17 03:40:39 +03:00
let writer = MeWriter {
id: writer_id,
addr,
2026-02-24 00:04:12 +03:00
generation,
2026-02-19 13:35:56 +03:00
tx: tx.clone(),
2026-02-17 03:40:39 +03:00
cancel: cancel.clone(),
degraded: degraded.clone(),
draining: draining.clone(),
2026-02-24 00:04:12 +03:00
draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(),
allow_drain_fallback: allow_drain_fallback.clone(),
2026-02-17 03:40:39 +03:00
};
self.writers.write().await.push(writer.clone());
2026-02-19 13:35:56 +03:00
self.conn_count.fetch_add(1, Ordering::Relaxed);
2026-02-23 03:20:13 +03:00
self.writer_available.notify_one();
2026-02-14 01:36:14 +03:00
let reg = self.registry.clone();
2026-02-17 03:40:39 +03:00
let writers_arc = self.writers_arc();
let ping_tracker = self.ping_tracker.clone();
2026-02-23 02:28:00 +03:00
let ping_tracker_reader = ping_tracker.clone();
2026-02-17 03:40:39 +03:00
let rtt_stats = self.rtt_stats.clone();
2026-02-23 02:28:00 +03:00
let stats_reader = self.stats.clone();
let stats_ping = self.stats.clone();
2026-02-17 03:40:39 +03:00
let pool = Arc::downgrade(self);
let cancel_ping = cancel.clone();
2026-02-19 13:35:56 +03:00
let tx_ping = tx.clone();
2026-02-17 03:40:39 +03:00
let ping_tracker_ping = ping_tracker.clone();
2026-02-19 13:35:56 +03:00
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
2026-02-19 15:39:30 +03:00
let keepalive_enabled = self.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
2026-02-17 03:40:39 +03:00
2026-02-14 01:36:14 +03:00
tokio::spawn(async move {
2026-02-17 03:40:39 +03:00
let res = reader_loop(
hs.rd,
hs.read_key,
hs.read_iv,
2026-02-23 02:28:00 +03:00
hs.crc_mode,
2026-02-17 03:40:39 +03:00
reg.clone(),
BytesMut::new(),
BytesMut::new(),
2026-02-19 13:35:56 +03:00
tx.clone(),
2026-02-23 02:28:00 +03:00
ping_tracker_reader,
2026-02-17 03:40:39 +03:00
rtt_stats.clone(),
2026-02-23 02:28:00 +03:00
stats_reader,
2026-02-17 03:40:39 +03:00
writer_id,
degraded.clone(),
2026-02-19 15:39:30 +03:00
cancel_reader_token.clone(),
2026-02-17 03:40:39 +03:00
)
.await;
2026-02-24 05:57:53 +03:00
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
2026-02-19 13:35:56 +03:00
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
2026-02-24 05:57:53 +03:00
{
pool.remove_writer_and_close_clients(writer_id).await;
2026-02-17 03:40:39 +03:00
}
if let Err(e) = res {
2026-02-14 01:36:14 +03:00
warn!(error = %e, "ME reader ended");
}
2026-02-17 03:40:39 +03:00
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
2026-02-14 01:36:14 +03:00
info!(remaining = ws.len(), "Dead ME writer removed from pool");
});
2026-02-17 03:40:39 +03:00
let pool_ping = Arc::downgrade(self);
2026-02-14 01:51:10 +03:00
tokio::spawn(async move {
let mut ping_id: i64 = rand::random::<i64>();
2026-02-23 03:41:51 +03:00
// Per-writer jittered start to avoid phase sync.
let startup_jitter = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else {
2026-02-14 04:19:44 +03:00
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
2026-02-23 03:41:51 +03:00
Duration::from_secs(wait)
};
tokio::select! {
_ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(startup_jitter) => {}
}
loop {
let wait = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
keepalive_interval
+ Duration::from_millis(
rand::rng().random_range(0..=effective_jitter_ms as u64)
)
} else {
let jitter = rand::rng()
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
Duration::from_secs(secs)
};
2026-02-17 03:40:39 +03:00
tokio::select! {
2026-02-19 15:39:30 +03:00
_ = cancel_ping_token.cancelled() => {
2026-02-17 03:40:39 +03:00
break;
}
2026-02-23 03:41:51 +03:00
_ = tokio::time::sleep(wait) => {}
2026-02-17 03:40:39 +03:00
}
2026-02-18 06:01:52 +03:00
let sent_id = ping_id;
2026-02-14 01:51:10 +03:00
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
2026-02-18 06:01:52 +03:00
p.extend_from_slice(&sent_id.to_le_bytes());
2026-02-17 03:40:39 +03:00
{
let mut tracker = ping_tracker_ping.lock().await;
2026-02-23 02:28:00 +03:00
let before = tracker.len();
2026-02-19 13:35:56 +03:00
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
2026-02-23 02:28:00 +03:00
let expired = before.saturating_sub(tracker.len());
if expired > 0 {
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
}
2026-02-18 06:01:52 +03:00
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
2026-02-17 03:40:39 +03:00
}
2026-02-18 06:01:52 +03:00
ping_id = ping_id.wrapping_add(1);
2026-02-23 03:41:51 +03:00
stats_ping.increment_me_keepalive_sent();
2026-02-19 13:35:56 +03:00
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
2026-02-23 03:41:51 +03:00
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
2026-02-17 03:40:39 +03:00
cancel_ping.cancel();
2026-02-24 05:57:53 +03:00
if let Some(pool) = pool_ping.upgrade()
&& cleanup_for_ping
2026-02-19 13:35:56 +03:00
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
2026-02-24 05:57:53 +03:00
{
pool.remove_writer_and_close_clients(writer_id).await;
2026-02-17 03:40:39 +03:00
}
2026-02-14 01:51:10 +03:00
break;
2026-02-14 01:36:14 +03:00
}
}
2026-02-14 01:51:10 +03:00
});
2026-02-14 01:36:14 +03:00
Ok(())
}
2026-02-17 03:40:39 +03:00
async fn connect_primary_for_dc(
self: Arc<Self>,
dc: i32,
mut addrs: Vec<(IpAddr, u16)>,
rng: Arc<SecureRandom>,
2026-02-18 19:50:16 +03:00
) -> bool {
2026-02-17 03:40:39 +03:00
if addrs.is_empty() {
2026-02-18 19:50:16 +03:00
return false;
2026-02-17 03:40:39 +03:00
}
addrs.shuffle(&mut rand::rng());
for (ip, port) in addrs {
let addr = SocketAddr::new(ip, port);
match self.connect_one(addr, rng.as_ref()).await {
Ok(()) => {
info!(%addr, dc = %dc, "ME connected");
2026-02-18 19:50:16 +03:00
return true;
2026-02-17 03:40:39 +03:00
}
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"),
}
}
warn!(dc = %dc, "All ME servers for DC failed at init");
2026-02-18 19:50:16 +03:00
false
2026-02-17 03:40:39 +03:00
}
2026-02-19 13:35:56 +03:00
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
2026-02-17 03:40:39 +03:00
}
}
async fn remove_writer_only(&self, writer_id: u64) -> Vec<BoundConn> {
2026-02-23 03:41:51 +03:00
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
2026-02-17 03:40:39 +03:00
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos);
2026-02-24 00:04:12 +03:00
if w.draining.load(Ordering::Relaxed) {
self.stats.decrement_pool_drain_active();
}
2026-02-17 03:40:39 +03:00
w.cancel.cancel();
2026-02-23 03:41:51 +03:00
close_tx = Some(w.tx.clone());
2026-02-19 13:35:56 +03:00
self.conn_count.fetch_sub(1, Ordering::Relaxed);
2026-02-17 03:40:39 +03:00
}
}
2026-02-23 03:41:51 +03:00
if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await;
}
2026-02-19 13:35:56 +03:00
self.rtt_stats.lock().await.remove(&writer_id);
2026-02-17 03:40:39 +03:00
self.registry.writer_lost(writer_id).await
}
pub(crate) async fn mark_writer_draining_with_timeout(
self: &Arc<Self>,
writer_id: u64,
timeout: Option<Duration>,
2026-02-24 00:04:12 +03:00
allow_drain_fallback: bool,
) {
let timeout = timeout.filter(|d| !d.is_zero());
let found = {
let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
2026-02-24 00:04:12 +03:00
let already_draining = w.draining.swap(true, Ordering::Relaxed);
w.allow_drain_fallback
.store(allow_drain_fallback, Ordering::Relaxed);
w.draining_started_at_epoch_secs
.store(Self::now_epoch_secs(), Ordering::Relaxed);
if !already_draining {
self.stats.increment_pool_drain_active();
}
w.draining.store(true, Ordering::Relaxed);
true
} else {
false
2026-02-17 03:40:39 +03:00
}
};
if !found {
return;
}
2026-02-17 03:40:39 +03:00
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
2026-02-24 00:04:12 +03:00
debug!(
writer_id,
timeout_secs,
allow_drain_fallback,
"ME writer marked draining"
);
let pool = Arc::downgrade(self);
tokio::spawn(async move {
let deadline = timeout.map(|t| Instant::now() + t);
2026-02-24 05:57:53 +03:00
while let Some(p) = pool.upgrade() {
if let Some(deadline_at) = deadline
&& Instant::now() >= deadline_at
{
warn!(writer_id, "Drain timeout, force-closing");
p.stats.increment_pool_force_close_total();
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
break;
2026-02-17 03:40:39 +03:00
}
2026-02-24 05:57:53 +03:00
tokio::time::sleep(Duration::from_secs(1)).await;
2026-02-17 03:40:39 +03:00
}
});
2026-02-17 03:40:39 +03:00
}
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
2026-02-24 00:04:12 +03:00
self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)), false)
.await;
}
2026-02-24 00:04:12 +03:00
pub(super) fn writer_accepts_new_binding(&self, writer: &MeWriter) -> bool {
if !writer.draining.load(Ordering::Relaxed) {
return true;
}
if !writer.allow_drain_fallback.load(Ordering::Relaxed) {
return false;
}
let ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed);
if ttl_secs == 0 {
return true;
}
let started = writer.draining_started_at_epoch_secs.load(Ordering::Relaxed);
if started == 0 {
return false;
}
Self::now_epoch_secs().saturating_sub(started) <= ttl_secs
}
2026-02-14 01:36:14 +03:00
}
2026-02-14 04:19:44 +03:00
2026-02-24 03:40:59 +03:00
#[allow(dead_code)]
2026-02-14 04:19:44 +03:00
fn hex_dump(data: &[u8]) -> String {
const MAX: usize = 64;
let mut out = String::with_capacity(data.len() * 2 + 3);
for (i, b) in data.iter().take(MAX).enumerate() {
if i > 0 {
out.push(' ');
}
out.push_str(&format!("{b:02x}"));
}
if data.len() > MAX {
out.push_str("");
}
out
}