Files
telemt/src/transport/middle_proxy/pool_refill.rs
T

281 lines
9.4 KiB
Rust
Raw Normal View History

2026-02-26 19:01:24 +03:00
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
2026-03-02 00:39:18 +03:00
use std::time::{Duration, Instant};
2026-02-26 19:01:24 +03:00
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
2026-03-02 20:41:51 +03:00
use crate::network::IpFamily;
2026-02-26 19:01:24 +03:00
2026-03-07 13:32:02 +03:00
use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour};
2026-02-26 19:01:24 +03:00
2026-03-02 00:39:18 +03:00
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
2026-02-26 19:01:24 +03:00
impl MePool {
2026-03-02 00:39:18 +03:00
pub(super) async fn maybe_quarantine_flapping_endpoint(
&self,
addr: SocketAddr,
uptime: Duration,
) {
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
return;
}
let until = Instant::now() + Duration::from_secs(ME_FLAP_QUARANTINE_SECS);
let mut guard = self.endpoint_quarantine.lock().await;
guard.retain(|_, expiry| *expiry > Instant::now());
guard.insert(addr, until);
2026-03-02 20:41:51 +03:00
self.stats.increment_me_endpoint_quarantine_total();
2026-03-02 00:39:18 +03:00
warn!(
%addr,
uptime_ms = uptime.as_millis(),
quarantine_secs = ME_FLAP_QUARANTINE_SECS,
"ME endpoint temporarily quarantined due to rapid writer flap"
);
}
2026-03-03 03:03:44 +03:00
pub(super) async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
2026-03-02 00:39:18 +03:00
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);
guard.contains_key(&addr)
}
async fn connectable_endpoints(&self, endpoints: &[SocketAddr]) -> Vec<SocketAddr> {
if endpoints.is_empty() {
return Vec::new();
}
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);
let mut ready = Vec::<SocketAddr>::with_capacity(endpoints.len());
let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None;
for addr in endpoints {
if let Some(expiry) = guard.get(addr).copied() {
match earliest_quarantine {
Some((_, current_expiry)) if current_expiry <= expiry => {}
_ => earliest_quarantine = Some((*addr, expiry)),
}
} else {
ready.push(*addr);
}
}
if !ready.is_empty() {
return ready;
}
if let Some((addr, expiry)) = earliest_quarantine {
debug!(
%addr,
wait_ms = expiry.saturating_duration_since(now).as_millis(),
"All ME endpoints are quarantined for the DC group; retrying earliest one"
);
return vec![addr];
}
Vec::new()
}
2026-03-07 13:32:02 +03:00
pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool {
2026-03-02 20:41:51 +03:00
let guard = self.refill_inflight_dc.lock().await;
2026-03-07 13:32:02 +03:00
guard.contains(&key)
2026-03-02 00:39:18 +03:00
}
2026-02-26 19:01:24 +03:00
pub(super) async fn connect_endpoints_round_robin(
self: &Arc<Self>,
2026-03-07 13:32:02 +03:00
dc: i32,
2026-02-26 19:01:24 +03:00
endpoints: &[SocketAddr],
rng: &SecureRandom,
2026-03-02 20:41:51 +03:00
) -> bool {
self.connect_endpoints_round_robin_with_generation_contour(
2026-03-07 13:32:02 +03:00
dc,
2026-03-02 20:41:51 +03:00
endpoints,
rng,
self.current_generation(),
WriterContour::Active,
)
.await
}
pub(super) async fn connect_endpoints_round_robin_with_generation_contour(
self: &Arc<Self>,
2026-03-07 13:32:02 +03:00
dc: i32,
2026-03-02 20:41:51 +03:00
endpoints: &[SocketAddr],
rng: &SecureRandom,
generation: u64,
contour: WriterContour,
2026-02-26 19:01:24 +03:00
) -> bool {
2026-03-02 00:39:18 +03:00
let candidates = self.connectable_endpoints(endpoints).await;
if candidates.is_empty() {
2026-02-26 19:01:24 +03:00
return false;
}
2026-03-02 00:39:18 +03:00
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
for offset in 0..candidates.len() {
let idx = (start + offset) % candidates.len();
let addr = candidates[idx];
2026-03-02 20:41:51 +03:00
match self
2026-03-07 13:32:02 +03:00
.connect_one_with_generation_contour_for_dc(addr, rng, generation, contour, dc)
2026-03-02 20:41:51 +03:00
.await
{
2026-02-26 19:01:24 +03:00
Ok(()) => return true,
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
}
}
false
}
2026-03-07 13:32:02 +03:00
async fn endpoints_for_dc(&self, target_dc: i32) -> Vec<SocketAddr> {
2026-02-26 19:01:24 +03:00
let mut endpoints = HashSet::<SocketAddr>::new();
if self.decision.ipv4_me {
2026-03-07 03:22:01 +03:00
let map = self.proxy_map_v4.read().await;
2026-03-06 19:59:23 +03:00
if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
2026-02-26 19:01:24 +03:00
}
}
}
if self.decision.ipv6_me {
2026-03-07 03:22:01 +03:00
let map = self.proxy_map_v6.read().await;
2026-03-06 19:59:23 +03:00
if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
2026-02-26 19:01:24 +03:00
}
}
}
let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect();
sorted.sort_unstable();
sorted
}
2026-03-07 13:32:02 +03:00
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr, writer_dc: i32) -> bool {
2026-02-26 19:01:24 +03:00
let fast_retries = self.me_reconnect_fast_retry_count.max(1);
2026-03-02 00:39:18 +03:00
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
2026-02-26 19:01:24 +03:00
2026-03-02 00:39:18 +03:00
if !same_endpoint_quarantined {
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
2026-03-07 13:32:02 +03:00
match self.connect_one_for_dc(addr, writer_dc, self.rng.as_ref()).await {
2026-03-02 00:39:18 +03:00
Ok(()) => {
self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_same_endpoint_total();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored on the same endpoint"
);
return true;
}
Err(e) => {
debug!(
%addr,
attempt = attempt + 1,
error = %e,
"ME immediate same-endpoint reconnect failed"
);
}
2026-02-26 19:01:24 +03:00
}
}
2026-03-02 00:39:18 +03:00
} else {
debug!(
%addr,
"Skipping immediate same-endpoint reconnect because endpoint is quarantined"
);
2026-02-26 19:01:24 +03:00
}
2026-03-07 13:32:02 +03:00
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
2026-02-26 19:01:24 +03:00
if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total();
return false;
}
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
if self
2026-03-07 13:32:02 +03:00
.connect_endpoints_round_robin(writer_dc, &dc_endpoints, self.rng.as_ref())
2026-02-26 19:01:24 +03:00
.await
{
self.stats.increment_me_reconnect_success();
self.stats.increment_me_writer_restored_fallback_total();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored via DC fallback endpoint"
);
return true;
}
}
self.stats.increment_me_refill_failed_total();
false
}
2026-03-07 13:32:02 +03:00
pub(crate) fn trigger_immediate_refill_for_dc(self: &Arc<Self>, addr: SocketAddr, writer_dc: i32) {
let endpoint_key = RefillEndpointKey {
dc: writer_dc,
addr,
};
let pre_inserted = if let Ok(mut guard) = self.refill_inflight.try_lock() {
if !guard.insert(endpoint_key) {
self.stats.increment_me_refill_skipped_inflight_total();
return;
}
true
} else {
false
};
let pool = Arc::clone(self);
tokio::spawn(async move {
let dc_key = RefillDcKey {
dc: writer_dc,
family: if addr.is_ipv4() {
IpFamily::V4
} else {
IpFamily::V6
},
};
if !pre_inserted {
2026-02-26 19:01:24 +03:00
let mut guard = pool.refill_inflight.lock().await;
2026-03-07 13:32:02 +03:00
if !guard.insert(endpoint_key) {
2026-02-26 19:01:24 +03:00
pool.stats.increment_me_refill_skipped_inflight_total();
return;
}
}
2026-03-02 20:41:51 +03:00
2026-03-07 13:32:02 +03:00
{
2026-03-02 20:41:51 +03:00
let mut dc_guard = pool.refill_inflight_dc.lock().await;
2026-03-07 13:32:02 +03:00
if dc_guard.contains(&dc_key) {
2026-03-02 20:41:51 +03:00
pool.stats.increment_me_refill_skipped_inflight_total();
drop(dc_guard);
let mut guard = pool.refill_inflight.lock().await;
2026-03-07 13:32:02 +03:00
guard.remove(&endpoint_key);
2026-03-02 20:41:51 +03:00
return;
}
2026-03-07 13:32:02 +03:00
dc_guard.insert(dc_key);
2026-03-02 20:41:51 +03:00
}
2026-02-26 19:01:24 +03:00
pool.stats.increment_me_refill_triggered_total();
2026-03-07 13:32:02 +03:00
let restored = pool.refill_writer_after_loss(addr, writer_dc).await;
2026-02-26 19:01:24 +03:00
if !restored {
2026-03-07 13:32:02 +03:00
warn!(%addr, dc = writer_dc, "ME immediate refill failed");
2026-02-26 19:01:24 +03:00
}
let mut guard = pool.refill_inflight.lock().await;
2026-03-07 13:32:02 +03:00
guard.remove(&endpoint_key);
2026-03-02 20:41:51 +03:00
drop(guard);
2026-03-07 13:32:02 +03:00
let mut dc_guard = pool.refill_inflight_dc.lock().await;
dc_guard.remove(&dc_key);
2026-02-26 19:01:24 +03:00
});
}
}