Files
telemt/src/transport/middle_proxy/send.rs
T

303 lines
12 KiB
Rust
Raw Normal View History

2026-03-01 03:36:00 +03:00
use std::cmp::Reverse;
2026-02-14 01:51:10 +03:00
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::Ordering;
2026-02-17 04:16:16 +03:00
use std::time::Duration;
2026-02-14 01:51:10 +03:00
2026-03-01 03:36:00 +03:00
use tokio::sync::mpsc::error::TrySendError;
2026-02-14 01:51:10 +03:00
use tracing::{debug, warn};
use crate::error::{ProxyError, Result};
2026-02-18 06:01:52 +03:00
use crate::network::IpFamily;
2026-02-15 14:02:00 +03:00
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
2026-02-14 01:51:10 +03:00
use super::MePool;
2026-02-19 13:35:56 +03:00
use super::codec::WriterCommand;
2026-02-14 01:51:10 +03:00
use super::wire::build_proxy_req_payload;
2026-02-15 14:02:00 +03:00
use rand::seq::SliceRandom;
2026-02-17 03:40:39 +03:00
use super::registry::ConnMeta;
2026-02-14 01:51:10 +03:00
impl MePool {
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
2026-02-14 01:51:10 +03:00
pub async fn send_proxy_req(
2026-02-17 03:40:39 +03:00
self: &Arc<Self>,
2026-02-14 01:51:10 +03:00
conn_id: u64,
target_dc: i16,
2026-02-14 01:51:10 +03:00
client_addr: SocketAddr,
our_addr: SocketAddr,
data: &[u8],
proto_flags: u32,
tag_override: Option<&[u8]>,
2026-02-14 01:51:10 +03:00
) -> Result<()> {
let tag = tag_override.or(self.proxy_tag.as_deref());
2026-02-14 01:51:10 +03:00
let payload = build_proxy_req_payload(
conn_id,
client_addr,
our_addr,
data,
tag,
2026-02-14 01:51:10 +03:00
proto_flags,
);
2026-02-17 03:40:39 +03:00
let meta = ConnMeta {
target_dc,
client_addr,
our_addr,
proto_flags,
};
2026-02-17 04:16:16 +03:00
let mut emergency_attempts = 0;
2026-02-14 01:51:10 +03:00
loop {
2026-02-17 03:40:39 +03:00
if let Some(current) = self.registry.get_writer(conn_id).await {
2026-03-01 03:36:00 +03:00
match current.tx.try_send(WriterCommand::Data(payload.clone())) {
2026-02-17 03:40:39 +03:00
Ok(()) => return Ok(()),
2026-03-01 03:36:00 +03:00
Err(TrySendError::Full(cmd)) => {
if current.tx.send(cmd).await.is_ok() {
return Ok(());
}
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
continue;
}
Err(TrySendError::Closed(_)) => {
2026-02-19 13:35:56 +03:00
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
2026-02-17 03:40:39 +03:00
continue;
}
}
2026-02-14 01:51:10 +03:00
}
2026-02-17 03:40:39 +03:00
let mut writers_snapshot = {
let ws = self.writers.read().await;
if ws.is_empty() {
2026-02-23 03:20:13 +03:00
// Create waiter before recovery attempts so notify_one permits are not missed.
let waiter = self.writer_available.notified();
2026-02-19 13:35:56 +03:00
drop(ws);
for family in self.family_order() {
let map = match family {
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
};
for (_dc, addrs) in map.iter() {
for (ip, port) in addrs {
let addr = SocketAddr::new(*ip, *port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
2026-02-23 03:20:13 +03:00
self.writer_available.notify_one();
2026-02-19 13:35:56 +03:00
break;
}
}
}
}
2026-02-23 03:20:13 +03:00
if !self.writers.read().await.is_empty() {
continue;
}
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() {
if !self.writers.read().await.is_empty() {
continue;
}
2026-02-19 13:35:56 +03:00
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
}
continue;
2026-02-17 03:40:39 +03:00
}
ws.clone()
};
let mut candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
if candidate_indices.is_empty() {
2026-02-17 03:40:39 +03:00
// Emergency connect-on-demand
2026-02-17 04:16:16 +03:00
if emergency_attempts >= 3 {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
}
emergency_attempts += 1;
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
let mut shuffled = addrs.clone();
shuffled.shuffle(&mut rand::rng());
drop(map_guard);
for (ip, port) in shuffled {
let addr = SocketAddr::new(ip, port);
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
break;
}
2026-02-15 14:02:00 +03:00
}
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
let ws2 = self.writers.read().await;
writers_snapshot = ws2.clone();
drop(ws2);
candidate_indices = self.candidate_indices_for_dc(&writers_snapshot, target_dc).await;
2026-02-19 13:35:56 +03:00
if !candidate_indices.is_empty() {
break;
}
2026-02-15 14:02:00 +03:00
}
}
if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
}
}
2026-02-17 03:40:39 +03:00
candidate_indices.sort_by_key(|idx| {
let w = &writers_snapshot[*idx];
let degraded = w.degraded.load(Ordering::Relaxed);
2026-02-24 00:04:12 +03:00
let stale = (w.generation < self.current_generation()) as usize;
2026-03-01 03:36:00 +03:00
(stale, degraded as usize, Reverse(w.tx.capacity()))
2026-02-17 03:40:39 +03:00
});
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
2026-03-01 03:36:00 +03:00
let mut fallback_blocking_idx: Option<usize> = None;
for offset in 0..candidate_indices.len() {
2026-02-17 03:40:39 +03:00
let idx = candidate_indices[(start + offset) % candidate_indices.len()];
let w = &writers_snapshot[idx];
2026-02-24 00:04:12 +03:00
if !self.writer_accepts_new_binding(w) {
continue;
}
2026-03-01 03:36:00 +03:00
match w.tx.try_send(WriterCommand::Data(payload.clone())) {
Ok(()) => {
self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.await;
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
debug!(
conn_id,
writer_id = w.id,
writer_generation = w.generation,
current_generation = self.current_generation(),
"Selected stale ME writer for fallback bind"
);
}
return Ok(());
}
Err(TrySendError::Full(_)) => {
if fallback_blocking_idx.is_none() {
fallback_blocking_idx = Some(idx);
}
}
Err(TrySendError::Closed(_)) => {
warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await;
continue;
2026-02-24 00:04:12 +03:00
}
2026-02-14 01:51:10 +03:00
}
}
2026-03-01 03:36:00 +03:00
let Some(blocking_idx) = fallback_blocking_idx else {
continue;
};
let w = writers_snapshot[blocking_idx].clone();
2026-02-24 00:04:12 +03:00
if !self.writer_accepts_new_binding(&w) {
continue;
}
2026-02-19 13:35:56 +03:00
match w.tx.send(WriterCommand::Data(payload.clone())).await {
2026-02-17 03:40:39 +03:00
Ok(()) => {
self.registry
2026-02-19 13:35:56 +03:00
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
2026-02-17 03:40:39 +03:00
.await;
2026-02-24 00:04:12 +03:00
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
}
2026-02-17 03:40:39 +03:00
return Ok(());
}
2026-02-19 13:35:56 +03:00
Err(_) => {
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await;
2026-02-14 01:51:10 +03:00
}
}
}
}
2026-02-17 03:40:39 +03:00
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
2026-02-15 14:02:00 +03:00
if let Some(w) = self.registry.get_writer(conn_id).await {
2026-02-14 01:51:10 +03:00
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
2026-02-19 13:35:56 +03:00
if w.tx.send(WriterCommand::DataAndFlush(p)).await.is_err() {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
2026-02-14 01:51:10 +03:00
}
2026-02-15 14:02:00 +03:00
} else {
debug!(conn_id, "ME close skipped (writer missing)");
2026-02-14 01:51:10 +03:00
}
self.registry.unregister(conn_id).await;
Ok(())
}
pub fn connection_count(&self) -> usize {
2026-02-19 13:35:56 +03:00
self.conn_count.load(Ordering::Relaxed)
2026-02-14 01:51:10 +03:00
}
2026-02-15 13:14:50 +03:00
pub(super) async fn candidate_indices_for_dc(
&self,
2026-02-17 03:40:39 +03:00
writers: &[super::pool::MeWriter],
2026-02-15 13:14:50 +03:00
target_dc: i16,
) -> Vec<usize> {
let key = target_dc as i32;
2026-02-18 06:01:52 +03:00
let mut preferred = Vec::<SocketAddr>::new();
2026-02-18 06:01:52 +03:00
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
if let Some(v) = map_guard.get(&key) {
2026-02-15 13:14:50 +03:00
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
2026-02-18 06:01:52 +03:00
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = map_guard.get(&abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
2026-02-15 13:14:50 +03:00
}
2026-02-18 06:01:52 +03:00
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = map_guard.get(&-abs) {
2026-02-15 13:14:50 +03:00
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
2026-02-18 06:01:52 +03:00
if preferred.is_empty() {
let def = self.default_dc.load(Ordering::Relaxed);
2026-02-24 05:57:53 +03:00
if def != 0
&& let Some(v) = map_guard.get(&def)
{
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
2026-02-18 06:01:52 +03:00
}
}
drop(map_guard);
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
2026-02-15 13:14:50 +03:00
if preferred.is_empty() {
return (0..writers.len())
2026-02-24 00:04:12 +03:00
.filter(|i| self.writer_accepts_new_binding(&writers[*i]))
.collect();
}
2026-02-15 13:14:50 +03:00
let mut out = Vec::new();
2026-02-17 03:40:39 +03:00
for (idx, w) in writers.iter().enumerate() {
2026-02-24 00:04:12 +03:00
if !self.writer_accepts_new_binding(w) {
continue;
}
2026-02-24 05:57:53 +03:00
if preferred.contains(&w.addr) {
2026-02-15 13:14:50 +03:00
out.push(idx);
}
}
if out.is_empty() {
return (0..writers.len())
2026-02-24 00:04:12 +03:00
.filter(|i| self.writer_accepts_new_binding(&writers[*i]))
.collect();
2026-02-15 13:14:50 +03:00
}
out
}
2026-02-15 13:14:50 +03:00
}