diff --git a/external/devp2p b/external/devp2p deleted file mode 160000 index bc76b9809..000000000 --- a/external/devp2p +++ /dev/null @@ -1 +0,0 @@ -Subproject commit bc76b9809a30e6dc5c8dcda996273f0f9bcf7108 diff --git a/src/config/nat_timeouts.rs b/src/config/nat_timeouts.rs index 861f7fa13..ea3fc4e7a 100644 --- a/src/config/nat_timeouts.rs +++ b/src/config/nat_timeouts.rs @@ -132,10 +132,15 @@ impl Default for RelayTimeouts { } /// Default time to wait for the peer to acknowledge stream data after a send. -const DEFAULT_SEND_ACK_TIMEOUT: Duration = Duration::from_secs(1); - -/// Fast-network send ACK timeout. -const FAST_SEND_ACK_TIMEOUT: Duration = Duration::from_millis(500); +/// +/// The actual timeout used on the send path is *adaptive*: it is computed as +/// `max(DEFAULT_SEND_ACK_TIMEOUT, data_len / 100_000)` seconds so that large +/// payloads on slow or newly-opened NAT-traversed connections are not +/// prematurely timed out. +const DEFAULT_SEND_ACK_TIMEOUT: Duration = Duration::from_secs(5); + +/// Fast-network send ACK timeout (for local/LAN networks). +const FAST_SEND_ACK_TIMEOUT: Duration = Duration::from_secs(2); /// Master timeout configuration #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 654a42f48..b68c90f0e 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -4802,10 +4802,13 @@ impl Connection { punch_me_now.round, punch_me_now.paired_with_sequence_number, punch_me_now.address ); - // v0.13.0: All nodes can coordinate - try coordination first + // v0.13.0: All nodes can coordinate - try coordination first. + // Only enter coordinator path if target_peer_id is present, meaning + // the sender wants us to relay to a target. When target_peer_id is None, + // this is a relayed frame and we are the target — fall through to the + // regular peer path below. if let Some(nat_state) = &self.nat_traversal { - // All nodes have bootstrap_coordinator now (v0.13.0) - if nat_state.bootstrap_coordinator.is_some() { + if nat_state.bootstrap_coordinator.is_some() && punch_me_now.target_peer_id.is_some() { // Process coordination request let from_peer_id = self.derive_peer_id_from_connection(); @@ -4832,6 +4835,7 @@ impl Connection { crate::shared::EndpointEventInner::RelayPunchMeNow( target_peer_id, coordination_frame, + self.path.remote, // sender's address for diagnostics ), ); } @@ -4851,6 +4855,10 @@ impl Connection { } // We're a regular peer receiving coordination from bootstrap + info!( + "Received PUNCH_ME_NOW coordination: round={}, address={}, from={}", + punch_me_now.round, punch_me_now.address, self.path.remote + ); let nat_state = self.nat_traversal.as_mut().ok_or_else(|| { TransportError::PROTOCOL_VIOLATION("PunchMeNow frame without NAT traversal negotiation") })?; @@ -4862,29 +4870,22 @@ impl Connection { TransportError::PROTOCOL_VIOLATION("Failed to handle peer punch request") })? { - trace!("Coordination synchronized for round {}", punch_me_now.round); + info!( + "Coordination synchronized for round {}, starting hole-punch to {}", + punch_me_now.round, punch_me_now.address + ); - // Create punch targets based on the received information - // The peer's address tells us where they'll be listening - let _local_addr = self - .local_ip - .map(|ip| SocketAddr::new(ip, 0)) - .unwrap_or_else(|| { - SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0) + // Emit an endpoint event to send NAT binding packets to the + // peer's address. This creates a bidirectional NAT binding so + // the peer's incoming QUIC connection can reach us. + self.endpoint_events + .push_back(crate::shared::EndpointEventInner::InitiateHolePunch { + peer_address: punch_me_now.address, }); - - let target = nat_traversal::PunchTarget { - remote_addr: punch_me_now.address, - remote_sequence: punch_me_now.paired_with_sequence_number, - challenge: self.rng.r#gen(), - }; - - // Start coordination with this target - let _ = nat_state.start_coordination_round(vec![target], now); } else { - debug!( - "Failed to synchronize coordination for round {}", - punch_me_now.round + info!( + "Failed to synchronize coordination for round {} (peer: {})", + punch_me_now.round, self.path.remote ); } diff --git a/src/connection/nat_traversal.rs b/src/connection/nat_traversal.rs index 11e4b2928..5bbbeade3 100644 --- a/src/connection/nat_traversal.rs +++ b/src/connection/nat_traversal.rs @@ -2679,6 +2679,30 @@ impl NatTraversalState { ); return Err(NatTraversalError::SuspiciousCoordination); } + // If there's an existing coordination that's stale (not in an active + // negotiation phase), reset it so a new PUNCH_ME_NOW can be processed. + let should_reset = if let Some(coord) = &self.coordination { + let stale = !matches!( + coord.state, + CoordinationPhase::Coordinating | CoordinationPhase::Requesting + ) || coord.round != peer_round; + // Active coordination for the same round — don't reset + if !stale && coord.round == peer_round { + false + } else { + stale + } + } else { + false + }; + if should_reset { + info!( + "Resetting stale coordination for new PUNCH_ME_NOW round {}", + peer_round + ); + self.coordination = None; + } + if let Some(coord) = &mut self.coordination { if coord.round == peer_round { match coord.state { @@ -2731,8 +2755,28 @@ impl NatTraversalState { Ok(false) } } else { - debug!("Received peer coordination but no active round"); - Ok(false) + // No active coordination round — this is a relayed PUNCH_ME_NOW + // from a coordinator, targeting us. Start a new coordination round + // to initiate hole-punching toward the requesting peer. + info!( + "Received peer coordination with no active round — starting new round {}", + peer_round + ); + self.coordination = Some(CoordinationState { + round: peer_round, + punch_targets: Vec::new(), + round_start: now, + punch_start: now + Duration::from_millis(150), + round_duration: self.coordination_timeout, + state: CoordinationPhase::Preparing, + punch_request_sent: false, + peer_punch_received: true, + retry_count: 0, + max_retries: 3, + timeout_state: AdaptiveTimeoutState::new(), + last_retry_at: None, + }); + Ok(true) } } diff --git a/src/endpoint.rs b/src/endpoint.rs index ba4ed843b..010fb8a90 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -6,16 +6,16 @@ // Full details available at https://saorsalabs.com/licenses use std::{ - collections::{HashMap, VecDeque, hash_map}, + collections::{HashMap, hash_map}, convert::TryFrom, - fmt, mem, + fmt, + hash::Hash, + mem, net::{IpAddr, SocketAddr}, ops::{Index, IndexMut}, sync::Arc, }; -use indexmap::IndexMap; - use bytes::{BufMut, Bytes, BytesMut}; use rand::{Rng, RngCore, SeedableRng, rngs::StdRng}; use rustc_hash::FxHashMap; @@ -47,46 +47,6 @@ use crate::{ transport_parameters::{PreferredAddress, TransportParameters}, }; -/// A queued relay request for bootstrap nodes -#[derive(Debug, Clone)] -struct RelayQueueItem { - /// Target peer ID for the relay - target_peer_id: PeerId, - /// Frame to be relayed - frame: frame::PunchMeNow, - /// When this relay request was created - created_at: Instant, - /// Number of relay attempts made - attempts: u32, - /// Last attempt time - last_attempt: Option, -} - -/// Relay queue management for bootstrap nodes -#[derive(Debug)] -struct RelayQueue { - /// Pending relay requests with insertion order and O(1) access - pending: IndexMap, - /// Next sequence number for insertion order - next_seq: u64, - /// Maximum queue size to prevent memory exhaustion - max_queue_size: usize, - /// Timeout for relay requests - request_timeout: Duration, - /// Maximum retry attempts per request - max_retries: u32, - /// Minimum interval between retry attempts - retry_interval: Duration, - /// Rate limiting: track recent relay requests per peer - rate_limiter: HashMap>, - /// Maximum relays per peer per time window - max_relays_per_peer: usize, - /// Rate limiting time window - rate_limit_window: Duration, - /// Last time rate limiter was cleaned up (to avoid cleaning on every check) - last_rate_limit_cleanup: Option, -} - /// Address discovery statistics #[derive(Debug, Default, Clone)] pub struct AddressDiscoveryStats { @@ -119,193 +79,6 @@ pub struct RelayStats { pub current_queue_size: usize, } -impl RelayQueue { - /// Create a new relay queue with default settings - fn new() -> Self { - Self { - pending: IndexMap::new(), - next_seq: 0, - max_queue_size: 1000, // Reasonable default - request_timeout: Duration::from_secs(30), // 30 second timeout - max_retries: 3, - retry_interval: Duration::from_millis(500), // 500ms between retries - rate_limiter: HashMap::new(), - max_relays_per_peer: 10, // Max 10 relays per peer per time window - rate_limit_window: Duration::from_secs(60), // 1 minute window - last_rate_limit_cleanup: None, - } - } - - /// Add a relay request to the queue - fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool { - // Check queue size limit - if self.pending.len() >= self.max_queue_size { - warn!( - "Relay queue full, dropping request for peer {:?}", - target_peer_id - ); - return false; - } - - // Check rate limit for this peer - if !self.check_rate_limit(target_peer_id, now) { - warn!( - "Rate limit exceeded for peer {:?}, dropping relay request", - target_peer_id - ); - return false; - } - - let item = RelayQueueItem { - target_peer_id, - frame, - created_at: now, - attempts: 0, - last_attempt: None, - }; - - let seq = self.next_seq; - self.next_seq += 1; - self.pending.insert(seq, item); - - // Record this request for rate limiting - self.record_relay_request(target_peer_id, now); - - trace!( - "Queued relay request for peer {:?}, queue size: {}", - target_peer_id, - self.pending.len() - ); - true - } - - /// Check if a relay request is within rate limits - fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool { - // Only clean up periodically (every 10 seconds) to reduce overhead - const CLEANUP_INTERVAL: Duration = Duration::from_secs(10); - let should_cleanup = self - .last_rate_limit_cleanup - .is_none_or(|last| now.saturating_duration_since(last) >= CLEANUP_INTERVAL); - - if should_cleanup { - self.cleanup_rate_limiter(now); - self.last_rate_limit_cleanup = Some(now); - } - - // Check current request count for this peer - if let Some(requests) = self.rate_limiter.get(&peer_id) { - requests.len() < self.max_relays_per_peer - } else { - true // No previous requests, allow - } - } - - /// Record a relay request for rate limiting - fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) { - self.rate_limiter.entry(peer_id).or_default().push_back(now); - } - - /// Clean up old rate limiting entries - fn cleanup_rate_limiter(&mut self, now: Instant) { - self.rate_limiter.retain(|_, requests| { - requests.retain(|&request_time| { - now.saturating_duration_since(request_time) <= self.rate_limit_window - }); - !requests.is_empty() - }); - } - - /// Get the next relay request that's ready to be processed - fn next_ready(&mut self, now: Instant) -> Option { - // Find the first request that's ready to be retried - let mut expired_keys = Vec::new(); - let mut ready_key = None; - - for (seq, item) in &self.pending { - // Check if request has timed out - if now.saturating_duration_since(item.created_at) > self.request_timeout { - expired_keys.push(*seq); - continue; - } - - // Check if it's ready for retry - if item.attempts == 0 - || item - .last_attempt - .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval) - { - ready_key = Some(*seq); - break; - } - } - - // Remove expired items - for key in expired_keys { - if let Some(expired) = self.pending.shift_remove(&key) { - debug!( - "Relay request for peer {:?} timed out after {:?}", - expired.target_peer_id, - now.saturating_duration_since(expired.created_at) - ); - } - } - - // Return ready item if found - if let Some(key) = ready_key { - if let Some(mut item) = self.pending.shift_remove(&key) { - item.attempts += 1; - item.last_attempt = Some(now); - return Some(item); - } - } - - None - } - - /// Requeue a failed relay request if it hasn't exceeded max retries - fn requeue_failed(&mut self, item: RelayQueueItem) { - if item.attempts < self.max_retries { - trace!( - "Requeuing failed relay request for peer {:?}, attempt {}/{}", - item.target_peer_id, item.attempts, self.max_retries - ); - let seq = self.next_seq; - self.next_seq += 1; - self.pending.insert(seq, item); - } else { - debug!( - "Dropping relay request for peer {:?} after {} failed attempts", - item.target_peer_id, item.attempts - ); - } - } - - /// Clean up expired requests and return number of items cleaned - fn cleanup_expired(&mut self, now: Instant) -> usize { - let initial_len = self.pending.len(); - let timeout = self.request_timeout; - - // Use retain for O(n) in-place removal instead of O(n) collect + O(n) remove - self.pending.retain(|_seq, item| { - let expired = now.saturating_duration_since(item.created_at) > timeout; - if expired { - debug!( - "Removing expired relay request for peer {:?}", - item.target_peer_id - ); - } - !expired - }); - - initial_len - self.pending.len() - } - - /// Get current queue length - fn len(&self) -> usize { - self.pending.len() - } -} - /// The main entry point to the library /// /// This object performs no I/O whatsoever. Instead, it consumes incoming packets and @@ -326,8 +99,6 @@ pub struct Endpoint { all_incoming_buffers_total_bytes: u64, /// Mapping from peer IDs to connection handles for relay functionality peer_connections: HashMap, - /// Relay queue for bootstrap nodes - relay_queue: RelayQueue, /// Relay statistics relay_stats: RelayStats, /// Comprehensive relay statistics collector @@ -339,6 +110,16 @@ pub struct Endpoint { /// Pending relay events to be sent to other connections /// These are generated when a coordinator receives a PUNCH_ME_NOW with target_peer_id pending_relay_events: Vec<(ConnectionHandle, ConnectionEvent)>, + /// Pending hole-punch connection attempts to initiate + /// These are generated when a target node receives a relayed PUNCH_ME_NOW + pending_hole_punch_addrs: Vec, +} + +/// Deterministic 32-byte wire ID from a SocketAddr, used to correlate +/// PUNCH_ME_NOW relay targets across connections. Delegates to the shared +/// implementation in `crate::shared::wire_id_from_addr`. +fn wire_id_from_addr(addr: SocketAddr) -> [u8; 32] { + crate::shared::wire_id_from_addr(addr) } impl Endpoint { @@ -371,12 +152,12 @@ impl Endpoint { incoming_buffers: Slab::new(), all_incoming_buffers_total_bytes: 0, peer_connections: HashMap::new(), - relay_queue: RelayQueue::new(), relay_stats: RelayStats::default(), relay_stats_collector: RelayStatisticsCollector::new(), address_discovery_enabled: true, // Default to enabled address_change_callback: None, pending_relay_events: Vec::new(), + pending_hole_punch_addrs: Vec::new(), } } @@ -409,62 +190,19 @@ impl Endpoint { self.peer_connections.get(peer_id).copied() } - /// Queue a frame for relay to a target peer - pub(crate) fn queue_frame_for_peer( - &mut self, - peer_id: &PeerId, - frame: frame::PunchMeNow, - ) -> bool { - self.relay_stats.requests_received += 1; - - if let Some(ch) = self.lookup_peer_connection(peer_id) { - // Peer is currently connected, try to relay immediately - if self.relay_frame_to_connection(ch, frame.clone()) { - self.relay_stats.requests_relayed += 1; - // Record successful rate limiting decision - self.relay_stats_collector.record_rate_limit(true); - trace!( - "Immediately relayed frame to peer {:?} via connection {:?}", - peer_id, ch - ); - return true; - } - } - - // Peer not connected or immediate relay failed, queue for later - let now = Instant::now(); - if self.relay_queue.enqueue(*peer_id, frame, now) { - self.relay_stats.current_queue_size = self.relay_queue.len(); - // Record successful rate limiting decision - self.relay_stats_collector.record_rate_limit(true); - trace!("Queued relay request for peer {:?}", peer_id); - true - } else { - // Check if it was rate limited or queue full - if !self.relay_queue.check_rate_limit(*peer_id, now) { - self.relay_stats.requests_rate_limited += 1; - // Record rate limiting rejection - self.relay_stats_collector.record_rate_limit(false); - // Record error - self.relay_stats_collector.record_error("rate_limited"); - } else { - self.relay_stats.requests_dropped += 1; - // Record error for queue full - self.relay_stats_collector - .record_error("resource_exhausted"); - } - false - } - } - /// Attempt to relay a frame to a specific connection fn relay_frame_to_connection( &mut self, ch: ConnectionHandle, frame: frame::PunchMeNow, ) -> bool { + // Strip target_peer_id before relaying — the receiving peer should process + // this as a direct coordination instruction, not attempt to relay further. + let mut relayed_frame = frame; + relayed_frame.target_peer_id = None; + // Queue the PunchMeNow frame to the connection via a connection event - let event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame)); + let event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(relayed_frame)); if self.connections.get(ch.0).is_some() { // Store the event to be processed by the high-level layer @@ -489,113 +227,16 @@ impl Endpoint { self.pending_relay_events.drain(..) } + /// Drain pending hole-punch addresses that need connection attempts. + pub fn drain_hole_punch_addrs(&mut self) -> impl Iterator + '_ { + self.pending_hole_punch_addrs.drain(..) + } + /// Set the peer ID for an existing connection pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) { if let Some(connection) = self.connections.get_mut(connection_handle.0) { connection.peer_id = Some(peer_id); self.register_peer(peer_id, connection_handle); - - // Process any queued relay requests for this peer - self.process_queued_relays_for_peer(peer_id); - } - } - - /// Process queued relay requests for a specific peer that just connected - fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) { - let mut processed = 0; - - // Collect only the sequence numbers for items to process (avoid cloning items) - let keys_to_process: Vec = self - .relay_queue - .pending - .iter() - .filter_map(|(seq, item)| { - if item.target_peer_id == peer_id { - Some(*seq) - } else { - None - } - }) - .collect(); - - // Remove and process items by key - for key in keys_to_process { - if let Some(item) = self.relay_queue.pending.shift_remove(&key) { - if let Some(ch) = self.lookup_peer_connection(&peer_id) { - if self.relay_frame_to_connection(ch, item.frame.clone()) { - self.relay_stats.requests_relayed += 1; - processed += 1; - trace!("Processed queued relay for peer {:?}", peer_id); - } else { - // Failed to relay, requeue - self.relay_queue.requeue_failed(item); - self.relay_stats.requests_failed += 1; - } - } - } - } - - self.relay_stats.current_queue_size = self.relay_queue.len(); - - if processed > 0 { - debug!("Processed {processed} queued relay requests for peer {peer_id:?}"); - } - } - - /// Process pending relay requests (should be called periodically) - pub fn process_relay_queue(&mut self) { - let now = Instant::now(); - let mut processed = 0; - let mut failed = 0; - - // Process ready relay requests - while let Some(item) = self.relay_queue.next_ready(now) { - if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) { - if self.relay_frame_to_connection(ch, item.frame.clone()) { - self.relay_stats.requests_relayed += 1; - processed += 1; - trace!( - "Successfully relayed frame to peer {:?}", - item.target_peer_id - ); - } else { - // Failed to relay, requeue for retry - self.relay_queue.requeue_failed(item); - self.relay_stats.requests_failed += 1; - // Record connection failure error - self.relay_stats_collector - .record_error("connection_failure"); - failed += 1; - } - } else { - // Peer not connected, requeue for later - self.relay_queue.requeue_failed(item); - // Record peer not found error - self.relay_stats_collector.record_error("peer_not_found"); - failed += 1; - } - } - - // Clean up expired requests - let expired = self.relay_queue.cleanup_expired(now); - if expired > 0 { - self.relay_stats.requests_timed_out += expired as u64; - // Record timeout errors for each expired request - for _ in 0..expired { - self.relay_stats_collector.record_error("request_timeout"); - } - debug!("Cleaned up {} expired relay requests", expired); - } - - self.relay_stats.current_queue_size = self.relay_queue.len(); - - if processed > 0 || failed > 0 { - trace!( - "Relay queue processing: {} processed, {} failed, {} in queue", - processed, - failed, - self.relay_queue.len() - ); } } @@ -617,11 +258,6 @@ impl Endpoint { &self.relay_stats_collector } - /// Get relay queue length - pub fn relay_queue_len(&self) -> usize { - self.relay_queue.len() - } - /// Process `EndpointEvent`s emitted from related `Connection`s /// /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`. @@ -652,16 +288,49 @@ impl Endpoint { } } } - RelayPunchMeNow(target_peer_id, punch_me_now) => { - // Handle relay request from bootstrap node - let peer_id = PeerId(target_peer_id); - if self.queue_frame_for_peer(&peer_id, punch_me_now) { - trace!( - "Successfully queued PunchMeNow frame for relay to peer {:?}", - peer_id + RelayPunchMeNow(target_peer_id, punch_me_now, _sender_addr) => { + // Relay PUNCH_ME_NOW to the target peer. + // target_peer_id = wire_id_from_addr(target_address) computed by the sender. + // We recompute this hash for each connection's remote address to find the target. + tracing::info!( + "RelayPunchMeNow received: target_wire_id={:?}, {} connections to check", + &target_peer_id[..8], + self.connections.len() + ); + let found = self.connections.iter().find_map(|(idx, meta)| { + let wire_id = wire_id_from_addr(meta.addresses.remote); + tracing::debug!( + " checking connection {} remote={}: wire_id={:?} match={}", + idx, + meta.addresses.remote, + &wire_id[..8], + wire_id == target_peer_id ); + if wire_id == target_peer_id { + Some((ConnectionHandle(idx), meta.addresses.remote)) + } else { + None + } + }); + if let Some((target_ch, target_addr)) = found { + if self.relay_frame_to_connection(target_ch, punch_me_now) { + self.relay_stats.requests_relayed += 1; + tracing::info!( + "Relayed PUNCH_ME_NOW to {} via wire_id lookup", + target_addr + ); + } else { + tracing::warn!( + "Failed to relay PUNCH_ME_NOW to connection {:?} for {}", + target_ch, + target_addr + ); + } } else { - warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id); + tracing::warn!( + "No connection found for PUNCH_ME_NOW relay target (wire_id {:?})", + &target_peer_id[..8] + ); } } SendAddressFrame(add_address_frame) => { @@ -682,6 +351,17 @@ impl Endpoint { // with other components or logging/metrics collection debug!("NAT candidate {} validated successfully", address); } + InitiateHolePunch { peer_address } => { + // Queue a hole-punch connection attempt as a relay event. + // The high-level NatTraversalEndpoint will handle the actual + // QUIC connection initiation since it has the async runtime + // and client config. + tracing::info!("InitiateHolePunch event: peer_address={}", peer_address); + // Store the address for the high-level layer to act on. + // We use pending_relay_events with a special sentinel ConnectionHandle. + // The high-level endpoint will need to handle this. + self.pending_hole_punch_addrs.push(peer_address); + } TryConnectTo { request_id, target_address, diff --git a/src/frame.rs b/src/frame.rs index 76ba0f81d..f879e193c 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -1484,10 +1484,8 @@ mod test { #[test] fn punch_me_now_with_target_peer_id() { - // Note: target_peer_id is only supported in legacy format, not RFC format - // This test verifies the legacy format can be encoded, but when decoded - // through the standard frame decoder, target_peer_id won't be preserved - // (as it's not part of the RFC format) + // target_peer_id is encoded as an extension field after the standard + // RFC fields. Verify it roundtrips correctly. let mut buf = Vec::new(); let target_peer_id = [0x42; 32]; // Test peer ID let addr = SocketAddr::from(([192, 168, 1, 100], 12345)); @@ -1497,7 +1495,6 @@ mod test { address: addr, target_peer_id: Some(target_peer_id), }; - // Use RFC encoding which doesn't include target_peer_id original.encode_rfc(&mut buf); let frames = frames(buf); assert_eq!(frames.len(), 1); @@ -1509,8 +1506,7 @@ mod test { original.paired_with_sequence_number ); assert_eq!(decoded.address, original.address); - // RFC format doesn't support target_peer_id - assert_eq!(decoded.target_peer_id, None); + assert_eq!(decoded.target_peer_id, Some(target_peer_id)); } x => panic!("incorrect frame {x:?}"), } @@ -1691,8 +1687,8 @@ mod test { decoded.paired_with_sequence_number ); assert_eq!(original_punch.address, decoded.address); - // RFC format doesn't support target_peer_id, so it should always be None - assert_eq!(decoded.target_peer_id, None); + // target_peer_id is encoded as an extension field + assert_eq!(decoded.target_peer_id, original_punch.target_peer_id); } _ => panic!("Expected PunchMeNow frame"), } diff --git a/src/frame/nat_traversal_unified.rs b/src/frame/nat_traversal_unified.rs index 4a4cdaad2..8eed190ae 100644 --- a/src/frame/nat_traversal_unified.rs +++ b/src/frame/nat_traversal_unified.rs @@ -284,6 +284,17 @@ impl PunchMeNow { buf.put_u16(addr.port()); } } + + // Encode target_peer_id for relay coordination (extension to RFC format) + match &self.target_peer_id { + Some(peer_id) => { + buf.put_u8(1); + buf.put_slice(peer_id); + } + None => { + buf.put_u8(0); + } + } Ok(()) } @@ -354,7 +365,28 @@ impl PunchMeNow { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(octets), port)) }; - Ok(Self::new(round, paired_with_sequence_number, address)) + // Decode optional target_peer_id (relay coordination extension) + let target_peer_id = if r.remaining() >= 1 { + let has_peer_id = r.get::()?; + match has_peer_id { + 1 => { + if r.remaining() < 32 { + return Err(UnexpectedEnd); + } + let mut peer_id = [0u8; 32]; + r.copy_to_slice(&mut peer_id); + Some(peer_id) + } + 0 => None, + _ => return Err(UnexpectedEnd), + } + } else { + None + }; + + let mut frame = Self::new(round, paired_with_sequence_number, address); + frame.target_peer_id = target_peer_id; + Ok(frame) } /// Try to decode, detecting format automatically diff --git a/src/high_level/connection.rs b/src/high_level/connection.rs index efc1b49a3..b355ccecf 100644 --- a/src/high_level/connection.rs +++ b/src/high_level/connection.rs @@ -631,7 +631,10 @@ impl Connection { ) -> Result<(), crate::ConnectionError> { let conn = &mut *self.0.state.lock("send_nat_punch_coordination"); conn.inner - .send_nat_punch_coordination(paired_with_sequence_number, address, round) + .send_nat_punch_coordination(paired_with_sequence_number, address, round)?; + // Wake the connection driver so it transmits the queued frame + conn.wake(); + Ok(()) } /// Queue a PUNCH_ME_NOW frame via a coordinator to reach a target peer behind NAT @@ -646,7 +649,10 @@ impl Connection { ) -> Result<(), crate::ConnectionError> { let conn = &mut *self.0.state.lock("send_nat_punch_via_relay"); conn.inner - .send_nat_punch_via_relay(target_peer_id, our_address, round) + .send_nat_punch_via_relay(target_peer_id, our_address, round)?; + // Wake the connection driver so it transmits the queued frame + conn.wake(); + Ok(()) } /// The side of the connection (client or server) @@ -654,12 +660,14 @@ impl Connection { self.0.state.lock("side").inner.side() } - /// The peer's UDP address + /// The peer's UDP address at connection creation time. /// - /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when - /// switching to a cellular internet connection. + /// Note: this returns the address captured when the connection was first + /// established. If `ServerConfig::migration` is `true`, the peer may have + /// migrated to a different address since then. This value does not update + /// after migration. pub fn remote_address(&self) -> SocketAddr { - self.0.state.lock("remote_address").inner.remote_address() + self.0.initial_remote_addr } /// The external/reflexive address observed by the remote peer @@ -672,12 +680,17 @@ impl Connection { /// - Address discovery is not enabled /// - No OBSERVED_ADDRESS frame has been received yet /// - The connection hasn't completed the handshake + /// - The internal lock could not be acquired without blocking (lock + /// contention). In this case `None` does **not** mean that no observed + /// address exists — the caller should retry on the next poll cycle. pub fn observed_address(&self) -> Option { - self.0 - .state - .lock("observed_address") - .inner - .observed_address() + // Use try_lock to avoid blocking tokio workers. poll_discovery + // calls this every second on all connections; blocking here + // competes with ConnectionDriver::poll() for the ParkingMutex. + match self.0.state.try_lock("observed_address") { + Some(guard) => guard.inner.observed_address(), + None => None, + } } /// The local IP address which was used when the peer established @@ -1116,7 +1129,9 @@ impl ConnectionRef { socket: Arc, runtime: Arc, ) -> Self { + let remote_addr = conn.remote_address(); Self(Arc::new(ConnectionInner { + initial_remote_addr: remote_addr, state: Mutex::new(State { inner: conn, driver: None, @@ -1184,6 +1199,7 @@ impl std::ops::Deref for ConnectionRef { pub(crate) struct ConnectionInner { pub(crate) state: Mutex, pub(crate) shared: Shared, + pub(crate) initial_remote_addr: SocketAddr, } #[derive(Debug, Default)] diff --git a/src/high_level/endpoint.rs b/src/high_level/endpoint.rs index f8f237b07..b77d178db 100644 --- a/src/high_level/endpoint.rs +++ b/src/high_level/endpoint.rs @@ -252,7 +252,22 @@ impl Endpoint { /// Set the client configuration used by `connect` pub fn set_default_client_config(&mut self, config: ClientConfig) { - self.default_client_config = Some(config); + self.default_client_config = Some(config.clone()); + // Also store in State so the driver can initiate hole-punch connections + if let Ok(mut state) = self.inner.0.state.lock() { + state.default_client_config = Some(config); + } + } + + /// Set the channel for forwarding hole-punch addresses to the NatTraversalEndpoint. + /// + /// When set, the endpoint driver will send hole-punch addresses through this channel + /// instead of doing fire-and-forget QUIC connections. This allows the NatTraversalEndpoint + /// to fully track and register the resulting connections. + pub fn set_hole_punch_tx(&self, tx: mpsc::UnboundedSender) { + if let Ok(mut state) = self.inner.0.state.lock() { + state.hole_punch_tx = Some(tx); + } } /// Connect to a remote endpoint @@ -609,6 +624,11 @@ pub(crate) struct State { driver_lost: bool, runtime: Arc, stats: EndpointStats, + /// Client config for initiating hole-punch connections + default_client_config: Option, + /// Channel for forwarding hole-punch addresses to the NatTraversalEndpoint + /// for full connection tracking instead of fire-and-forget. + hole_punch_tx: Option>, } #[derive(Debug)] @@ -687,6 +707,61 @@ impl State { } } + // Process hole-punch connection attempts from relayed PUNCH_ME_NOW. + // Forward addresses to the NatTraversalEndpoint (via channel) for full + // connection tracking, or fall back to fire-and-forget if no channel. + let hole_punch_addrs: Vec = self.inner.drain_hole_punch_addrs().collect(); + for peer_address in hole_punch_addrs { + did_work = true; + if let Some(ref tx) = self.hole_punch_tx { + // Forward to NatTraversalEndpoint for full tracking + match tx.send(peer_address) { + Ok(()) => { + tracing::info!( + "Hole-punch: forwarded {} to NatTraversalEndpoint for tracked connection", + peer_address, + ); + } + Err(e) => { + tracing::warn!( + "Hole-punch: failed to forward {} (channel closed): {}", + peer_address, + e, + ); + } + } + } else if let Some(ref config) = self.default_client_config { + // Fallback: fire-and-forget (no NatTraversalEndpoint channel configured). + // This is intentional for backward compatibility: when no hole_punch_tx + // is configured we still send a QUIC Initial to create a NAT binding. + // The resulting connection handles (_ch, _conn) are deliberately + // discarded — Quinn's internal idle timeout will clean them up. + let addr = if self.ipv6 { + SocketAddr::V6(ensure_ipv6(peer_address)) + } else { + peer_address + }; + match self + .inner + .connect(crate::Instant::now(), config.clone(), addr, "peer") + { + Ok((_ch, _conn)) => { + tracing::info!( + "Hole-punch: sent QUIC Initial to {} for NAT binding (fire-and-forget fallback)", + peer_address, + ); + } + Err(e) => { + tracing::warn!( + "Hole-punch: failed to initiate connection to {}: {:?}", + peer_address, + e + ); + } + } + } + } + did_work } } @@ -844,6 +919,8 @@ impl EndpointRef { recv_state, runtime, stats: EndpointStats::default(), + default_client_config: None, + hole_punch_tx: None, }), })) } diff --git a/src/high_level/mutex.rs b/src/high_level/mutex.rs index 25dad18d8..3251e0d86 100644 --- a/src/high_level/mutex.rs +++ b/src/high_level/mutex.rs @@ -51,6 +51,17 @@ mod tracking { } } + /// Tries to acquire the lock without blocking. + pub(crate) fn try_lock(&self, purpose: &'static str) -> Option> { + let now = Instant::now(); + let guard = self.inner.try_lock()?; + Some(MutexGuard { + guard, + start_time: now, + purpose, + }) + } + /// Acquires the lock for a certain purpose /// /// The purpose will be recorded in the list of last lock owners @@ -147,6 +158,14 @@ mod non_tracking { } } + /// Tries to acquire the lock without blocking. + #[allow(unused_variables)] + pub(crate) fn try_lock(&self, purpose: &'static str) -> Option> { + Some(MutexGuard { + guard: self.inner.try_lock()?, + }) + } + /// Acquires the lock for a certain purpose /// /// The purpose will be recorded in the list of last lock owners diff --git a/src/link_transport_impl.rs b/src/link_transport_impl.rs index fd8a69be2..98482e692 100644 --- a/src/link_transport_impl.rs +++ b/src/link_transport_impl.rs @@ -667,17 +667,30 @@ impl LinkTransport for P2pLinkTransport { _proto: ProtocolId, ) -> BoxFuture<'_, LinkResult> { Box::pin(async move { - // Connect through P2pEndpoint - let _peer_conn = self + // Use connect_with_fallback for NAT traversal support: + // direct connect → hole-punching → relay fallback + let target_ipv4 = if addr.is_ipv4() { Some(addr) } else { None }; + let target_ipv6 = if addr.is_ipv6() { Some(addr) } else { None }; + + let (peer_conn, method) = self .endpoint - .connect(addr) + .connect_with_fallback(target_ipv4, target_ipv6, None) .await .map_err(|e| LinkError::ConnectionFailed(e.to_string()))?; - // Get the underlying QUIC connection by address + // The actual connected address may differ from the requested addr + // (e.g. when connected via relay or hole-punch) + let connected_addr = peer_conn.remote_addr.as_socket_addr().unwrap_or(addr); + + info!( + "dial_addr: connected to {} (requested {}) via {:?}", + connected_addr, addr, method + ); + + // Get the underlying QUIC connection by the actual connected address let conn = self .endpoint - .get_quic_connection(&addr) + .get_quic_connection(&connected_addr) .await .map_err(|e| LinkError::ConnectionFailed(e.to_string()))? .ok_or_else(|| LinkError::ConnectionFailed("Connection not found".to_string()))?; @@ -688,7 +701,7 @@ impl LinkTransport for P2pLinkTransport { .and_then(|id| id.downcast::>().ok()) .map(|boxed| *boxed); - Ok(P2pLinkConn::new(conn, public_key, addr)) + Ok(P2pLinkConn::new(conn, public_key, connected_addr)) }) } diff --git a/src/nat_traversal_api.rs b/src/nat_traversal_api.rs index 4aaa081d3..de753d237 100644 --- a/src/nat_traversal_api.rs +++ b/src/nat_traversal_api.rs @@ -12,14 +12,7 @@ //! QUIC connections through NATs using sophisticated hole punching and //! coordination protocols. -use std::{ - collections::hash_map::DefaultHasher, - fmt, - hash::{Hash, Hasher}, - net::SocketAddr, - sync::Arc, - time::Duration, -}; +use std::{fmt, net::SocketAddr, sync::Arc, time::Duration}; use crate::constrained::{ConstrainedEngine, EngineConfig, EngineEvent}; use crate::transport::TransportRegistry; @@ -276,10 +269,15 @@ pub struct NatTraversalEndpoint { event_tx: Option>, /// Receiver for internal event notifications /// Uses parking_lot::Mutex for faster, non-poisoning access - event_rx: ParkingMutex>, + event_rx: Arc>>, /// Notify waiters when a new ConnectionEstablished event is available. /// Eliminates the 10ms polling loop in accept_connection(). incoming_notify: Arc, + /// Channel for accepted connection addresses — the P2pEndpoint's + /// incoming_connection_forwarder reads from the receiver to register + /// accepted connections in connected_peers. + accepted_addrs_tx: mpsc::UnboundedSender, + accepted_addrs_rx: Arc>>, /// Notify waiters when the endpoint is shutting down. /// Eliminates polling loops that check the AtomicBool in transport listeners. shutdown_notify: Arc, @@ -324,6 +322,19 @@ pub struct NatTraversalEndpoint { /// P2pEndpoint polls this to receive data from constrained transports /// Uses TokioMutex (not ParkingMutex) because MutexGuard is held across .await constrained_event_rx: TokioMutex>, + /// Receiver for hole-punch addresses forwarded from the Quinn driver. + /// When a relayed PUNCH_ME_NOW triggers InitiateHolePunch at the Quinn level, + /// the address is sent through this channel so we can create a fully tracked + /// connection (DashMap + events + handlers) instead of fire-and-forget. + hole_punch_rx: TokioMutex>, + /// Channel for handshakes completing in the background. Spawned handshake + /// tasks send completed connections here, and accept_connection_direct + /// receives them. Persistent across calls so no connections are lost. + handshake_tx: mpsc::Sender>, + handshake_rx: TokioMutex>>, + /// Tracks when each connection was first observed as closed. + /// Used to enforce a grace period before removing dead connections. + closed_at: dashmap::DashMap, } /// Configuration for NAT traversal behavior @@ -1326,6 +1337,17 @@ impl NatTraversalEndpoint { // Create channel for forwarding constrained engine events to P2pEndpoint let (constrained_event_tx, constrained_event_rx) = mpsc::unbounded_channel(); + let (accepted_addrs_tx, accepted_addrs_rx) = mpsc::unbounded_channel(); + + // Channel for hole-punch addresses from Quinn driver → NatTraversalEndpoint + let (hole_punch_tx, hole_punch_rx) = mpsc::unbounded_channel(); + // Configure the inner endpoint to forward hole-punch addresses through the channel + // instead of doing fire-and-forget connections at the Quinn level. + inner_endpoint.set_hole_punch_tx(hole_punch_tx); + + // Channel for background handshake completion (persistent across accept calls) + let (hs_tx, hs_rx) = mpsc::channel(32); + let endpoint = Self { inner_endpoint: Some(inner_endpoint.clone()), config: config.clone(), @@ -1335,8 +1357,10 @@ impl NatTraversalEndpoint { event_callback, shutdown: Arc::new(AtomicBool::new(false)), event_tx: Some(event_tx.clone()), - event_rx: ParkingMutex::new(event_rx), + event_rx: Arc::new(ParkingMutex::new(event_rx)), incoming_notify: Arc::new(tokio::sync::Notify::new()), + accepted_addrs_tx: accepted_addrs_tx.clone(), + accepted_addrs_rx: Arc::new(TokioMutex::new(accepted_addrs_rx)), shutdown_notify: Arc::new(tokio::sync::Notify::new()), connections: Arc::new(dashmap::DashMap::new()), timeout_config: config.timeouts.clone(), @@ -1351,6 +1375,10 @@ impl NatTraversalEndpoint { constrained_engine, constrained_event_tx: constrained_event_tx.clone(), constrained_event_rx: TokioMutex::new(constrained_event_rx), + hole_punch_rx: TokioMutex::new(hole_punch_rx), + handshake_tx: hs_tx, + handshake_rx: TokioMutex::new(hs_rx), + closed_at: dashmap::DashMap::new(), }; // Multi-transport listening: Spawn receive tasks for all online transports @@ -1506,31 +1534,13 @@ impl NatTraversalEndpoint { } } - // v0.13.0+: All nodes are symmetric P2P nodes - always start accepting connections - { - let endpoint_clone = inner_endpoint.clone(); - let shutdown_clone = endpoint.shutdown.clone(); - let event_tx_clone = event_tx.clone(); - let connections_clone = endpoint.connections.clone(); - let emitted_events_clone = emitted_established_events.clone(); - let relay_server_clone = endpoint.relay_server.clone(); - let incoming_notify_clone = endpoint.incoming_notify.clone(); - - tokio::spawn(async move { - Self::accept_connections( - endpoint_clone, - shutdown_clone, - event_tx_clone, - connections_clone, - emitted_events_clone, - relay_server_clone, - incoming_notify_clone, - ) - .await; - }); - - info!("Started accepting connections (symmetric P2P node)"); - } + // Spawn the unified accept loop. This background task handles Quinn + // accept + handshakes in parallel and feeds completed connections to + // accept_connection_direct() via a channel. Unlike the old + // accept_connections task, it doesn't register connections in + // P2pEndpoint — that's done by the caller of accept_connection_direct. + endpoint.spawn_accept_loop(); + info!("Accept loop spawned (unified path, parallel handshakes)"); // Start background discovery polling task let discovery_manager_clone = endpoint.discovery_manager.clone(); @@ -1724,6 +1734,17 @@ impl NatTraversalEndpoint { // Create channel for forwarding constrained engine events to P2pEndpoint let (constrained_event_tx, constrained_event_rx) = mpsc::unbounded_channel(); + let (accepted_addrs_tx, accepted_addrs_rx) = mpsc::unbounded_channel(); + + // Channel for hole-punch addresses from Quinn driver → NatTraversalEndpoint + let (hole_punch_tx, hole_punch_rx) = mpsc::unbounded_channel(); + // Configure the inner endpoint to forward hole-punch addresses through the channel + // instead of doing fire-and-forget connections at the Quinn level. + inner_endpoint.set_hole_punch_tx(hole_punch_tx); + + // Channel for background handshake completion (persistent across accept calls) + let (hs_tx, hs_rx) = mpsc::channel(32); + let endpoint = Self { inner_endpoint: Some(inner_endpoint.clone()), config: config.clone(), @@ -1733,8 +1754,10 @@ impl NatTraversalEndpoint { event_callback, shutdown: Arc::new(AtomicBool::new(false)), event_tx: Some(event_tx.clone()), - event_rx: ParkingMutex::new(event_rx), + event_rx: Arc::new(ParkingMutex::new(event_rx)), incoming_notify: Arc::new(tokio::sync::Notify::new()), + accepted_addrs_tx: accepted_addrs_tx.clone(), + accepted_addrs_rx: Arc::new(TokioMutex::new(accepted_addrs_rx)), shutdown_notify: Arc::new(tokio::sync::Notify::new()), connections: Arc::new(dashmap::DashMap::new()), timeout_config: config.timeouts.clone(), @@ -1749,6 +1772,10 @@ impl NatTraversalEndpoint { constrained_engine, constrained_event_tx: constrained_event_tx.clone(), constrained_event_rx: TokioMutex::new(constrained_event_rx), + hole_punch_rx: TokioMutex::new(hole_punch_rx), + handshake_tx: hs_tx, + handshake_rx: TokioMutex::new(hs_rx), + closed_at: dashmap::DashMap::new(), }; // Multi-transport listening: Spawn receive tasks for all online transports @@ -1904,31 +1931,13 @@ impl NatTraversalEndpoint { } } - // v0.13.0+: All nodes are symmetric P2P nodes - always start accepting connections - { - let endpoint_clone = inner_endpoint.clone(); - let shutdown_clone = endpoint.shutdown.clone(); - let event_tx_clone = event_tx.clone(); - let connections_clone = endpoint.connections.clone(); - let emitted_events_clone = emitted_established_events.clone(); - let relay_server_clone = endpoint.relay_server.clone(); - let incoming_notify_clone = endpoint.incoming_notify.clone(); - - tokio::spawn(async move { - Self::accept_connections( - endpoint_clone, - shutdown_clone, - event_tx_clone, - connections_clone, - emitted_events_clone, - relay_server_clone, - incoming_notify_clone, - ) - .await; - }); - - info!("Started accepting connections (symmetric P2P node)"); - } + // Spawn the unified accept loop. This background task handles Quinn + // accept + handshakes in parallel and feeds completed connections to + // accept_connection_direct() via a channel. Unlike the old + // accept_connections task, it doesn't register connections in + // P2pEndpoint — that's done by the caller of accept_connection_direct. + endpoint.spawn_accept_loop(); + info!("Accept loop spawned (unified path, parallel handshakes)"); // Start background discovery polling task let discovery_manager_clone = endpoint.discovery_manager.clone(); @@ -2083,13 +2092,17 @@ impl NatTraversalEndpoint { target_addr, coordinator ); - // Create new session + // Create new session — start in Coordination phase directly. + // Discovery is for finding our own external address, which we + // already know. For remote targets behind NAT, there are no + // local candidates to discover. The Coordination phase sends + // PUNCH_ME_NOW to the coordinator to initiate hole-punching. let session = NatTraversalSession { target_addr, coordinator, attempt: 1, started_at: std::time::Instant::now(), - phase: TraversalPhase::Discovery, + phase: TraversalPhase::Coordination, candidates: Vec::new(), session_state: SessionState { state: ConnectionState::Connecting, @@ -2132,21 +2145,10 @@ impl NatTraversalEndpoint { } /// Generate a deterministic 32-byte identifier from a SocketAddr for wire - /// protocol frames (PUNCH_ME_NOW, ADDRESS_DISCOVERY). This is a legacy - /// format used in coordination messages, not a session key. + /// protocol frames (PUNCH_ME_NOW, ADDRESS_DISCOVERY). Delegates to the + /// shared implementation in `crate::shared::wire_id_from_addr`. fn wire_id_from_addr(addr: SocketAddr) -> [u8; 32] { - let mut hasher = DefaultHasher::new(); - addr.hash(&mut hasher); - - let hash = hasher.finish(); - let mut bytes = [0u8; 32]; - bytes[0..8].copy_from_slice(&hash.to_be_bytes()); - // Repeat hash for remaining bytes to make it deterministic - bytes[8..16].copy_from_slice(&hash.to_le_bytes()); - bytes[16..24].copy_from_slice(&hash.to_be_bytes()); - bytes[24..32].copy_from_slice(&hash.to_le_bytes()); - - bytes + crate::shared::wire_id_from_addr(addr) } /// Poll all active sessions and update their states @@ -2738,6 +2740,7 @@ impl NatTraversalEndpoint { let emitted_events_clone = self.emitted_established_events.clone(); let relay_server_clone = self.relay_server.clone(); let incoming_notify_clone = self.incoming_notify.clone(); + let accepted_addrs_tx_clone = self.accepted_addrs_tx.clone(); tokio::spawn(async move { Self::accept_connections( @@ -2748,6 +2751,7 @@ impl NatTraversalEndpoint { emitted_events_clone, relay_server_clone, incoming_notify_clone, + accepted_addrs_tx_clone, ) .await; }); @@ -2764,6 +2768,7 @@ impl NatTraversalEndpoint { emitted_events: Arc>, relay_server: Option>, incoming_notify: Arc, + accepted_addrs_tx: mpsc::UnboundedSender, ) { while !shutdown.load(Ordering::Relaxed) { match endpoint.accept().await { @@ -2773,6 +2778,7 @@ impl NatTraversalEndpoint { let emitted_events = emitted_events.clone(); let relay_server = relay_server.clone(); let incoming_notify = incoming_notify.clone(); + let accepted_addrs_tx = accepted_addrs_tx.clone(); tokio::spawn(async move { match connecting.await { Ok(connection) => { @@ -2783,10 +2789,24 @@ impl NatTraversalEndpoint { let public_key = Self::extract_public_key_from_connection(&connection); - // Store the connection keyed by remote address - // DashMap provides fine-grained locking internally - no blocking + // Store the connection keyed by remote address. + // Always overwrite — the latest connection from the + // accept handler is most likely alive, replacing any + // dead duplicate from simultaneous-open. connections.insert(remote_address, connection.clone()); + // Notify the P2pEndpoint's forwarder about the new connection + match accepted_addrs_tx.send(remote_address) { + Ok(()) => info!( + "accept_connections: sent {} to forwarder channel", + remote_address + ), + Err(e) => error!( + "accept_connections: forwarder channel send FAILED for {}: {}", + remote_address, e + ), + } + // Only emit ConnectionEstablished if we haven't already for this address // DashSet::insert returns true if the value was newly inserted let should_emit = emitted_events.insert(remote_address); @@ -3490,6 +3510,131 @@ impl NatTraversalEndpoint { } } + /// Accept the next connection (incoming or hole-punched). + /// + /// Returns connections from a background accept loop that handles Quinn + /// accept, handshake completion, and outgoing hole-punch connections. + /// This method never holds locks across await points — it simply reads + /// from the handshake channel. + pub async fn accept_connection_direct( + &self, + ) -> Result<(SocketAddr, InnerConnection), NatTraversalError> { + let mut rx = self.handshake_rx.lock().await; + loop { + if self.shutdown.load(Ordering::Relaxed) { + return Err(NatTraversalError::NetworkError( + "Endpoint shutting down".to_string(), + )); + } + + match rx.recv().await { + Some(Ok((addr, conn))) => return Ok((addr, conn)), + Some(Err(_)) => continue, + None => { + return Err(NatTraversalError::NetworkError( + "Accept channel closed".to_string(), + )); + } + } + } + } + + /// Spawn the background accept loop that feeds `accept_connection_direct`. + /// + /// This task owns the Quinn accept and processes handshakes in parallel. + /// Outgoing hole-punch connections are detected via `incoming_notify` and + /// looked up directly in the connections DashMap, avoiding competing + /// consumers on the `event_rx` channel (which is drained by `poll()`). + /// All completed connections are sent through `handshake_tx`. + fn spawn_accept_loop(&self) { + let endpoint = match self.inner_endpoint.clone() { + Some(ep) => ep, + None => return, + }; + let tx = self.handshake_tx.clone(); + let connections = self.connections.clone(); + let emitted = self.emitted_established_events.clone(); + let relay_server = self.relay_server.clone(); + let event_tx_opt = self.event_tx.clone(); + let shutdown = self.shutdown.clone(); + let incoming_notify = self.incoming_notify.clone(); + + tokio::spawn(async move { + loop { + if shutdown.load(Ordering::Relaxed) { + return; + } + + // Race Quinn accept against hole-punch notify. + // When incoming_notify fires, a new outgoing hole-punch + // connection was inserted into the DashMap. We forward any + // newly-emitted connections to the handshake channel. + let connecting = tokio::select! { + Some(c) = endpoint.accept() => c, + _ = incoming_notify.notified() => { + // Hole-punch completed — check DashMap for new + // outgoing connections and forward them. + let mut outgoing_conns = Vec::new(); + for entry in connections.iter() { + let addr = *entry.key(); + if emitted.insert(addr) { + // First time seeing this address — forward it. + outgoing_conns.push((addr, entry.value().clone())); + } + } + for (addr, conn) in outgoing_conns { + let _ = tx.send(Ok((addr, conn))).await; + } + continue; + } + }; + + // Spawn handshake in background so we immediately loop back + // to accept the next incoming connection. + let tx2 = tx.clone(); + let connections2 = connections.clone(); + let emitted2 = emitted.clone(); + let relay_server2 = relay_server.clone(); + let event_tx2 = event_tx_opt.clone(); + tokio::spawn(async move { + let connection = match connecting.await { + Ok(conn) => conn, + Err(e) => { + debug!("Accept handshake failed: {}", e); + let _ = tx2.send(Err(e.to_string())).await; + return; + } + }; + + let remote_address = connection.remote_address(); + info!("Accepted connection from {} (unified path)", remote_address); + + connections2.insert(remote_address, connection.clone()); + emitted2.insert(remote_address); + + if let Some(ref server) = relay_server2 { + let conn_clone = connection.clone(); + let server_clone = Arc::clone(server); + tokio::spawn(async move { + Self::handle_relay_requests(conn_clone, server_clone).await; + }); + } + + if let Some(ref etx) = event_tx2 { + let etx = etx.clone(); + let addr = remote_address; + let conn = connection.clone(); + tokio::spawn(async move { + Self::handle_connection(addr, conn, etx).await; + }); + } + + let _ = tx2.send(Ok((remote_address, connection))).await; + }); + } + }); + } + /// Returns a reference to the connection notification handle. /// /// This `Notify` is triggered whenever a `ConnectionEstablished` event @@ -3501,20 +3646,21 @@ impl NatTraversalEndpoint { /// Check if we have a live connection to the given address. /// - /// If the connection exists but is dead (closed/draining), removes it + /// If the connection exists but is dead (has a `close_reason`), removes it /// from the connection table and returns `false`. This enables automatic /// cleanup of phantom connections during deduplication checks. pub fn is_connected(&self, addr: &SocketAddr) -> bool { - if let Some(conn) = self.connections.get(addr) { - if conn.is_alive() { - return true; + if let Some(entry) = self.connections.get(addr) { + if entry.value().close_reason().is_some() { + // Connection is dead — remove it and report not connected. + drop(entry); // release the DashMap ref before removing + self.connections.remove(addr); + return false; } - // Connection is dead -- drop the DashMap ref before removing - drop(conn); - // Clean up dead connection - let _ = self.remove_connection(addr); + true + } else { + false } - false } /// Get an active connection by remote address @@ -3529,6 +3675,21 @@ impl NatTraversalEndpoint { .map(|entry| entry.value().clone())) } + /// Get the receiver for accepted connection addresses. + /// The P2pEndpoint's incoming_connection_forwarder uses this to register + /// accepted connections in connected_peers. + pub fn accepted_addrs_rx(&self) -> Arc>> { + Arc::clone(&self.accepted_addrs_rx) + } + + /// Iterate over all connections in the DashMap. + pub fn connections_iter( + &self, + ) -> impl Iterator> + { + self.connections.iter() + } + /// Add or update a connection for a remote address pub fn add_connection( &self, @@ -3543,6 +3704,32 @@ impl NatTraversalEndpoint { "add_connection: now have {} connections", self.connections.len() ); + + // Register connected peer as a potential coordinator for NAT traversal. + // In the symmetric P2P architecture (v0.13.0+), any connected node can + // coordinate hole-punching for us. + { + let mut nodes = self.bootstrap_nodes.write(); + if !nodes.iter().any(|n| n.address == addr) { + nodes.push(BootstrapNode { + address: addr, + last_seen: std::time::Instant::now(), + can_coordinate: true, + rtt: None, + coordination_count: 0, + }); + info!( + "add_connection: registered {} as NAT traversal coordinator ({} total)", + addr, + nodes.len() + ); + } + } + + // Notify waiters that a new connection is available. + // This wakes up try_hole_punch loops waiting for the target connection. + self.incoming_notify.notify_waiters(); + Ok(()) } @@ -4656,8 +4843,15 @@ impl NatTraversalEndpoint { } } - /// Detect closed connections and emit ConnectionLost events + /// Detect closed connections, emit ConnectionLost events, and reap stale + /// entries after a 5-second grace period. + /// + /// The grace period prevents removing connections that are briefly closed + /// during simultaneous-open deduplication but then replaced by a live one. fn poll_closed_connections(&self, events: &mut Vec) { + let now = std::time::Instant::now(); + let grace_period = std::time::Duration::from_secs(5); + let closed_connections: Vec<_> = self .connections .iter() @@ -4670,7 +4864,24 @@ impl NatTraversalEndpoint { .collect(); for (addr, reason) in closed_connections { - self.connections.remove(&addr); + // Record the time we first observed this connection as closed. + let first_seen_closed = *self.closed_at.entry(addr).or_insert(now); + + if now.duration_since(first_seen_closed) >= grace_period { + // Grace period elapsed — remove the dead connection. + self.connections.remove(&addr); + self.closed_at.remove(&addr); + debug!( + "Connection to {} closed: {}, removed after grace period", + addr, reason + ); + } else { + debug!( + "Connection to {} closed: {}, keeping for grace period", + addr, reason + ); + } + self.emit_event( events, NatTraversalEvent::ConnectionLost { @@ -5338,6 +5549,69 @@ impl NatTraversalEndpoint { } } + /// Send the coordination request (PUNCH_ME_NOW) if the session is ready. + /// + /// This is a targeted alternative to poll() that only sends the coordination + /// request without iterating all sessions or connections, avoiding the + /// DashMap deadlock risk in poll(). + pub fn send_coordination_request_if_ready( + &self, + target: SocketAddr, + coordinator: SocketAddr, + ) -> Result<(), NatTraversalError> { + // Check if we have an active session that needs coordination + if let Some(mut session) = self.active_sessions.get_mut(&target) { + if matches!(session.phase, TraversalPhase::Coordination) { + session.phase = TraversalPhase::Synchronization; + drop(session); // Release DashMap lock before sending + self.send_coordination_request(target, coordinator)?; + } + } + Ok(()) + } + + /// Drain pending hole-punch addresses forwarded from the Quinn driver and + /// create fully tracked connections for each. + /// + /// This is called from the session driver task to process addresses that were + /// forwarded from the Quinn-level `InitiateHolePunch` event handler. Unlike + /// the previous fire-and-forget approach, these connections are stored in the + /// DashMap, emit events, and have handlers spawned — so the node can actually + /// receive and respond to data on them. + pub async fn process_pending_hole_punches(&self) { + let mut rx = self.hole_punch_rx.lock().await; + while let Ok(peer_address) = rx.try_recv() { + info!( + "Processing hole-punch address from Quinn driver: {}", + peer_address + ); + if let Err(e) = self.attempt_hole_punch_connection(peer_address) { + warn!( + "Failed to initiate tracked hole-punch connection to {}: {}", + peer_address, e + ); + } + } + } + + /// Attempt a QUIC connection to a peer address for hole-punching. + /// + /// Sends QUIC Initial packets to the target address, creating a NAT binding + /// from our socket. Called when we receive a relayed PUNCH_ME_NOW from a + /// coordinator, indicating a remote peer wants to reach us. + pub fn attempt_hole_punch_connection( + &self, + peer_address: SocketAddr, + ) -> Result<(), NatTraversalError> { + let candidate = CandidateAddress { + address: peer_address, + priority: 100, + source: CandidateSource::Peer, + state: CandidateState::New, + }; + self.attempt_connection_to_candidate(peer_address, &candidate) + } + /// Check if any hole punch succeeded fn check_punch_results(&self, addr: &SocketAddr) -> Option { // Check if we have an established connection to this address @@ -5603,7 +5877,11 @@ impl NatTraversalEndpoint { Ok(()) } - crate::shared::EndpointEventInner::RelayPunchMeNow(_target_peer_id, punch_frame) => { + crate::shared::EndpointEventInner::RelayPunchMeNow( + _target_peer_id, + punch_frame, + _sender_addr, + ) => { // RFC-compliant address-based relay: find peer by address let target_address = punch_frame.address; let normalized_target = normalize_socket_addr(target_address); diff --git a/src/p2p_endpoint.rs b/src/p2p_endpoint.rs index 8387a9ff2..1dc1fb615 100644 --- a/src/p2p_endpoint.rs +++ b/src/p2p_endpoint.rs @@ -182,6 +182,17 @@ pub struct P2pEndpoint { /// it and calls `do_cleanup_connection` immediately — no waiting for the /// periodic stale reaper. reader_exit_tx: mpsc::UnboundedSender, + + /// In-flight connection attempts, keyed by target address. + /// + /// When multiple concurrent `connect_with_fallback` calls target the same + /// address (e.g., 3 chunks all needing the same NATed node), only the first + /// call does the actual connection work. Subsequent callers subscribe to a + /// broadcast channel and wait for the result instead of starting parallel + /// hole-punch attempts that deadlock the runtime. + pending_dials: Arc< + tokio::sync::Mutex>>>, + >, } impl std::fmt::Debug for P2pEndpoint { @@ -718,6 +729,7 @@ impl P2pEndpoint { reader_tasks, reader_handles, reader_exit_tx, + pending_dials: Arc::new(tokio::sync::Mutex::new(HashMap::new())), }; // Spawn background constrained poller task @@ -731,6 +743,16 @@ impl P2pEndpoint { // task detects a dead QUIC connection, without waiting for the reaper. endpoint.spawn_reader_exit_handler(reader_exit_rx); + // Spawn NAT traversal session driver — periodically polls the + // NatTraversalEndpoint to advance sessions through Discovery → + // Coordination → Punching phases. Runs independently of + // try_hole_punch to avoid DashMap lock contention deadlocks. + endpoint.spawn_session_driver(); + + // Spawn incoming connection forwarder — bridges accepted connections + // from the NatTraversalEndpoint to P2pEndpoint's connected_peers. + endpoint.spawn_incoming_connection_forwarder(); + Ok(endpoint) } @@ -1298,14 +1320,95 @@ impl P2pEndpoint { return Err(EndpointError::ShuttingDown); } + // Dedup: if another task is already connecting to this target, wait for + // its result instead of starting a parallel attempt. This prevents + // multiple concurrent hole-punch sessions that deadlock the runtime. + let target = target_ipv4.or(target_ipv6); + if let Some(target_addr) = target { + let mut pending = self.pending_dials.lock().await; + if let Some(tx) = pending.get(&target_addr) { + // Another task is already connecting — subscribe and wait + let mut rx = tx.subscribe(); + drop(pending); + info!( + "connect_with_fallback: waiting for in-flight dial to {}", + target_addr + ); + match rx.recv().await { + Ok(Ok(conn)) => { + return Ok(( + conn, + ConnectionMethod::HolePunched { + coordinator: target_addr, + }, + )); + } + Ok(Err(_)) | Err(_) => { + // Primary dial failed — fall through and try ourselves + } + } + } else { + // We're the first — register ourselves + let (tx, _) = broadcast::channel(4); + pending.insert(target_addr, tx); + drop(pending); + } + } + + // Do the actual connection work + let result = self + .connect_with_fallback_inner(target_ipv4, target_ipv6, strategy_config) + .await; + + // Broadcast result to any waiters and clean up pending entry + if let Some(target_addr) = target { + let mut pending = self.pending_dials.lock().await; + if let Some(tx) = pending.remove(&target_addr) { + match &result { + Ok((conn, _)) => { + let _ = tx.send(Ok(conn.clone())); + } + Err(e) => { + let _ = tx.send(Err(e.to_string())); + } + } + } + } + + result + } + + /// Inner implementation of connect_with_fallback (separated for dedup wrapper). + async fn connect_with_fallback_inner( + &self, + target_ipv4: Option, + target_ipv6: Option, + strategy_config: Option, + ) -> Result<(PeerConnection, ConnectionMethod), EndpointError> { // Build strategy config with coordinator and relay from our config let mut config = strategy_config.unwrap_or_default(); if config.coordinator.is_none() { + // Try known_peers first (configured bootstrap nodes) config.coordinator = self .config .known_peers .first() .and_then(|addr| addr.as_socket_addr()); + + // If no known_peers, use any currently connected peer as coordinator. + // In the symmetric P2P architecture, any connected node can coordinate + // NAT traversal for us. + if config.coordinator.is_none() { + let peers = self.connected_peers.read().await; + let target = target_ipv4.or(target_ipv6); + config.coordinator = peers.keys().find(|&&addr| Some(addr) != target).copied(); + if let Some(coord) = config.coordinator { + info!( + "Using connected peer {} as NAT traversal coordinator", + coord + ); + } + } } if config.relay_addrs.is_empty() { // Optimization: Try to find a high-quality relay from our cache first @@ -1337,6 +1440,24 @@ impl P2pEndpoint { config.relay_addrs.push(relay_addr); } } + + // If still no relay addresses, use connected peers as relay candidates. + // In the symmetric architecture, every node runs a MASQUE relay server. + if config.relay_addrs.is_empty() { + let peers = self.connected_peers.read().await; + let target = target_ipv4.or(target_ipv6); + for &addr in peers.keys() { + if Some(addr) != target { + config.relay_addrs.push(addr); + } + } + if !config.relay_addrs.is_empty() { + info!( + "Using {} connected peer(s) as relay candidates", + config.relay_addrs.len() + ); + } + } } let mut strategy = ConnectionStrategy::new(config); @@ -1356,6 +1477,49 @@ impl P2pEndpoint { } loop { + // Check if a previous hole-punch attempt established the connection + // asynchronously (e.g. the target connected to us after receiving + // a relayed PUNCH_ME_NOW from a prior round). + let target = target_ipv4.or(target_ipv6); + if let Some(target_addr) = target { + if self.inner.is_connected(&target_addr) { + info!( + "connect_with_fallback: connection to {} established asynchronously", + target_addr + ); + let peer_conn = PeerConnection { + public_key: None, + remote_addr: TransportAddr::Quic(target_addr), + authenticated: true, + connected_at: Instant::now(), + last_activity: Instant::now(), + }; + // Spawn background reader task for data reception + if let Ok(Some(conn)) = self.inner.get_connection(&target_addr) { + self.spawn_reader_task(target_addr, conn).await; + } + + self.connected_peers + .write() + .await + .insert(target_addr, peer_conn.clone()); + + // Broadcast PeerConnected so the identity exchange is triggered + let _ = self.event_tx.send(P2pEvent::PeerConnected { + addr: TransportAddr::Quic(target_addr), + public_key: peer_conn.public_key.clone(), + side: Side::Client, + }); + + return Ok(( + peer_conn, + ConnectionMethod::HolePunched { + coordinator: target_addr, // approximate + }, + )); + } + } + match strategy.current_stage().clone() { ConnectionStage::DirectIPv4 { .. } => { // Use Happy Eyeballs (RFC 8305) to race all direct addresses (IPv4 + IPv6) @@ -1469,6 +1633,19 @@ impl P2pEndpoint { return Ok((conn, ConnectionMethod::HolePunched { coordinator })); } Ok(Err(e)) => { + // After a failed hole-punch round, try a quick direct + // connect — the NAT binding may have been created by + // the target's outgoing packets even though our + // try_hole_punch didn't detect the connection. + if let Ok(Ok(peer_conn)) = + timeout(Duration::from_secs(3), self.connect(target)).await + { + info!("✓ Post-hole-punch direct connect succeeded to {}", target); + return Ok(( + peer_conn, + ConnectionMethod::HolePunched { coordinator }, + )); + } strategy.record_holepunch_error(round, e.to_string()); if strategy.should_retry_holepunch() { debug!("Hole-punch round {} failed, retrying", round); @@ -1479,6 +1656,16 @@ impl P2pEndpoint { } } Err(_) => { + // Same: try a quick direct connect after timeout + if let Ok(Ok(peer_conn)) = + timeout(Duration::from_secs(3), self.connect(target)).await + { + info!("✓ Post-hole-punch direct connect succeeded to {}", target); + return Ok(( + peer_conn, + ConnectionMethod::HolePunched { coordinator }, + )); + } strategy.record_holepunch_error(round, "Timeout".to_string()); if strategy.should_retry_holepunch() { debug!("Hole-punch round {} timed out, retrying", round); @@ -1650,13 +1837,14 @@ impl P2pEndpoint { self.connect(coordinator).await?; } - // Derive an inner PeerId for the NAT traversal layer from the target address - // Initiate NAT traversal (inner layer uses SocketAddr) + // Initiate NAT traversal — sends PUNCH_ME_NOW to coordinator self.inner .initiate_nat_traversal(target, coordinator) .map_err(EndpointError::NatTraversal)?; - // Poll for completion with event-driven notification instead of sleep loop + // Poll for the connection to appear. The target node will receive + // the relayed PUNCH_ME_NOW and initiate a QUIC connection to us, + // which gets accepted by saorsa-core's transport handler. let deadline = tokio::time::Instant::now() + Duration::from_secs(15); loop { @@ -1664,50 +1852,36 @@ impl P2pEndpoint { return Err(EndpointError::ShuttingDown); } - let events = self - .inner - .poll(Instant::now()) - .map_err(EndpointError::NatTraversal)?; - - for event in events { - match event { - NatTraversalEvent::ConnectionEstablished { - remote_address, - public_key, - .. - } if remote_address == target => { - let peer_conn = PeerConnection { - public_key: public_key.clone(), - remote_addr: TransportAddr::Quic(remote_address), - authenticated: true, - connected_at: Instant::now(), - last_activity: Instant::now(), - }; - - // Spawn background reader task BEFORE storing in connected_peers - if let Ok(Some(conn)) = self.inner.get_connection(&target) { - self.spawn_reader_task(remote_address, conn).await; - } - - self.connected_peers - .write() - .await - .insert(remote_address, peer_conn.clone()); + // Check NatTraversalEndpoint's connections + if self.inner.is_connected(&target) { + info!("try_hole_punch: connection to {} established", target); + let peer_conn = PeerConnection { + public_key: None, + remote_addr: TransportAddr::Quic(target), + authenticated: true, + connected_at: Instant::now(), + last_activity: Instant::now(), + }; + self.connected_peers + .write() + .await + .insert(target, peer_conn.clone()); + return Ok(peer_conn); + } - return Ok(peer_conn); - } - NatTraversalEvent::TraversalFailed { - remote_address, - error, - .. - } if remote_address == target => { - return Err(EndpointError::NatTraversal(error)); - } - _ => {} + // Check P2pEndpoint's connected_peers (populated by saorsa-core) + { + let peers = self.connected_peers.read().await; + if let Some(existing) = peers.get(&target) { + info!("try_hole_punch: connection to {} found in peers", target); + return Ok(existing.clone()); } } - // Wait for connection notification, shutdown, or timeout + // The background session driver (spawn_session_driver) calls + // poll() every 500ms to advance sessions. We just wait here. + + // Wait briefly then re-check, or timeout tokio::select! { _ = self.inner.connection_notify().notified() => {} _ = self.shutdown.cancelled() => { @@ -1716,6 +1890,8 @@ impl P2pEndpoint { _ = tokio::time::sleep_until(deadline) => { return Err(EndpointError::Timeout); } + // Wake periodically to drive session and re-check connections + _ = tokio::time::sleep(Duration::from_millis(500)) => {} } } } @@ -1769,7 +1945,7 @@ impl P2pEndpoint { } let result = tokio::select! { - r = self.inner.accept_connection() => r, + r = self.inner.accept_connection_direct() => r, _ = self.shutdown.cancelled() => return None, }; @@ -1798,8 +1974,23 @@ impl P2pEndpoint { // Spawn background reader task BEFORE storing in connected_peers // to prevent race where recv() misses early data - if let Ok(Some(conn)) = self.inner.get_connection(&remote_addr) { - self.spawn_reader_task(remote_addr, conn).await; + match self.inner.get_connection(&remote_addr) { + Ok(Some(conn)) => { + info!("accept: spawning reader task for {}", remote_addr); + self.spawn_reader_task(remote_addr, conn).await; + } + Ok(None) => { + error!( + "accept: get_connection({}) returned None — NO reader task spawned!", + remote_addr + ); + } + Err(e) => { + error!( + "accept: get_connection({}) failed: {} — NO reader task spawned!", + remote_addr, e + ); + } } self.connected_peers @@ -1906,13 +2097,41 @@ impl P2pEndpoint { return Err(EndpointError::ShuttingDown); } - // Get peer's transport address - let peer_info = self.connected_peers.read().await; - let peer_conn = peer_info - .get(addr) - .ok_or(EndpointError::PeerNotFound(*addr))?; - let transport_addr = peer_conn.remote_addr.clone(); - drop(peer_info); // Release read lock before async operations + // Get peer's transport address and optionally capture the connection + // for hole-punched peers that bypassed normal registration. + let (transport_addr, cached_connection) = { + let peer_info = self.connected_peers.read().await; + if let Some(peer_conn) = peer_info.get(addr) { + (peer_conn.remote_addr.clone(), None) + } else { + // Check if the NatTraversalEndpoint has a connection to this + // address (e.g. from a hole-punch that bypassed the normal path). + // Capture the connection now before it can be cleaned up. + drop(peer_info); + if let Ok(Some(conn)) = self.inner.get_connection(addr) { + info!( + "send: found hole-punched connection to {}, registering", + addr + ); + let peer_conn = PeerConnection { + public_key: None, + remote_addr: TransportAddr::Quic(*addr), + authenticated: true, + connected_at: Instant::now(), + last_activity: Instant::now(), + }; + self.connected_peers.write().await.insert(*addr, peer_conn); + let _ = self.event_tx.send(P2pEvent::PeerConnected { + addr: TransportAddr::Quic(*addr), + public_key: None, + side: Side::Server, + }); + (TransportAddr::Quic(*addr), Some(conn)) + } else { + return Err(EndpointError::PeerNotFound(*addr)); + } + } + }; // Select protocol engine based on transport address let engine = { @@ -1922,35 +2141,49 @@ impl P2pEndpoint { match engine { crate::transport::ProtocolEngine::Quic => { - // Use existing QUIC connection (UDP transport) - let connection = self - .inner - .get_connection(addr) - .map_err(EndpointError::NatTraversal)? - .ok_or(EndpointError::PeerNotFound(*addr))?; - - let mut send_stream = connection - .open_uni() - .await - .map_err(|e| EndpointError::Connection(e.to_string()))?; + // Use cached connection (from hole-punch) or look up fresh + let connection = if let Some(conn) = cached_connection { + conn + } else { + self.inner + .get_connection(addr) + .map_err(EndpointError::NatTraversal)? + .ok_or(EndpointError::PeerNotFound(*addr))? + }; - send_stream - .write_all(data) - .await - .map_err(|e| EndpointError::Connection(e.to_string()))?; + let mut send_stream = connection.open_uni().await.map_err(|e| { + warn!("send({}): open_uni failed: {}", addr, e); + EndpointError::Connection(e.to_string()) + })?; + + send_stream.write_all(data).await.map_err(|e| { + warn!( + "send({}): write_all ({} bytes) failed: {}", + addr, + data.len(), + e + ); + EndpointError::Connection(e.to_string()) + })?; - send_stream - .finish() - .map_err(|e| EndpointError::Connection(e.to_string()))?; + send_stream.finish().map_err(|e| { + warn!("send({}): finish failed: {}", addr, e); + EndpointError::Connection(e.to_string()) + })?; // Wait for the peer to acknowledge receipt of all stream data. // Without this, finish() only buffers a FIN locally — if the // connection is dead the caller would see Ok(()) despite the // data never arriving. // - // Bounded by send_ack_timeout so phantom connections don't - // block the caller for the full QUIC idle timeout (~30 s). - let ack_timeout = self.config.timeouts.send_ack_timeout; + // The timeout is adaptive: base timeout (from config) plus + // 1 second per 100 KB of payload, so large transfers on + // slow NAT-traversed connections are not prematurely killed. + let size_secs = (data.len() / 100_000) as u64; + let ack_timeout = std::cmp::max( + self.config.timeouts.send_ack_timeout, + Duration::from_secs(size_secs), + ); match timeout(ack_timeout, send_stream.stopped()).await { Ok(Ok(None)) => {} Ok(Ok(Some(stop_code))) => { @@ -2022,11 +2255,6 @@ impl P2pEndpoint { } } - // Update last activity - if let Some(peer_conn) = self.connected_peers.write().await.get_mut(addr) { - peer_conn.last_activity = Instant::now(); - } - Ok(()) } @@ -2213,18 +2441,18 @@ impl P2pEndpoint { /// The task exits naturally when the connection is closed or the channel is dropped. async fn spawn_reader_task(&self, addr: SocketAddr, connection: crate::high_level::Connection) { let data_tx = self.data_tx.clone(); - let connected_peers = Arc::clone(&self.connected_peers); let event_tx = self.event_tx.clone(); let max_read_bytes = self.config.max_message_size; let exit_tx = self.reader_exit_tx.clone(); let abort_handle = self.reader_tasks.lock().await.spawn(async move { + info!("Reader task STARTED for {}", addr); loop { // Accept the next unidirectional stream let mut recv_stream = match connection.accept_uni().await { Ok(stream) => stream, Err(e) => { - debug!("Reader task for {} ending: accept_uni error: {}", addr, e); + info!("Reader task for {} ending: accept_uni error: {}", addr, e); break; } }; @@ -2233,18 +2461,19 @@ impl P2pEndpoint { Ok(data) if data.is_empty() => continue, Ok(data) => data, Err(e) => { - debug!("Reader task for {}: read_to_end error: {}", addr, e); + info!("Reader task for {}: read_to_end error: {}", addr, e); break; } }; let data_len = data.len(); - tracing::trace!("Reader task: {} bytes from {}", data_len, addr); + info!("Reader task: {} bytes from {}", data_len, addr); - // Update last_activity - if let Some(peer_conn) = connected_peers.write().await.get_mut(&addr) { - peer_conn.last_activity = Instant::now(); - } + // Note: last_activity update moved out of the hot path to avoid + // RwLock write contention. With N reader tasks all acquiring + // write locks on every message, the lock becomes a bottleneck + // that can starve other tasks and deadlock the runtime. + // The DataReceived event below serves as a liveness signal. // Emit DataReceived event let _ = event_tx.send(P2pEvent::DataReceived { @@ -2252,10 +2481,35 @@ impl P2pEndpoint { bytes: data_len, }); - // Send through channel; if the receiver is dropped, exit - if data_tx.send((addr, data)).await.is_err() { - debug!("Reader task for {}: channel closed, exiting", addr); - break; + // Send through channel without blocking the reader task's + // event loop. Using try_send avoids holding a tokio worker + // thread when the channel is full. If the channel is full, + // spawn a short-lived task that retries with a timeout instead + // of dropping data immediately. + match data_tx.try_send((addr, data)) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full((addr, data))) => { + let tx = data_tx.clone(); + let data_len = data.len(); + tokio::spawn(async move { + if tokio::time::timeout( + Duration::from_secs(5), + tx.send((addr, data)), + ) + .await + .is_err() + { + warn!( + "Reader task for {}: data channel send timed out, dropping {} bytes", + addr, data_len + ); + } + }); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("Reader task for {}: channel closed, exiting", addr); + break; + } } } @@ -2539,6 +2793,93 @@ impl P2pEndpoint { }); } + /// Spawn a background task that periodically drives the NAT traversal + /// session state machine via `poll()`. + /// + /// This runs `poll()` on its own task, decoupled from `try_hole_punch`, + /// to avoid DashMap lock contention deadlocks between `poll()` and the + /// concurrent accept handler. + fn spawn_session_driver(&self) { + let inner = Arc::clone(&self.inner); + let shutdown = self.shutdown.clone(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = interval.tick() => {} + _ = shutdown.cancelled() => { + debug!("NAT traversal session driver shutting down"); + return; + } + } + + // Drive the session state machine. Errors are non-fatal — + // the session will retry on the next tick. + if let Err(e) = inner.poll(Instant::now()) { + debug!("NAT traversal poll error (will retry): {:?}", e); + } + + // Process any hole-punch addresses forwarded from the Quinn driver. + // These are addresses from relayed PUNCH_ME_NOW that need fully tracked + // outgoing connections (not fire-and-forget). + inner.process_pending_hole_punches().await; + } + }); + } + + /// Spawn a background task that monitors for new connections accepted by + /// the NatTraversalEndpoint and registers them in `connected_peers` + + /// emits `PeerConnected` events. This bridges the gap between the + /// NatTraversalEndpoint's accept handler and the P2pEndpoint's tracking. + fn spawn_incoming_connection_forwarder(&self) { + debug!("FORWARDER_DEBUG: spawn_incoming_connection_forwarder called"); + let connected_peers = Arc::clone(&self.connected_peers); + let event_tx = self.event_tx.clone(); + let shutdown = self.shutdown.clone(); + let accepted_rx = self.inner.accepted_addrs_rx(); + + tokio::spawn(async move { + debug!("FORWARDER_DEBUG: started, acquiring rx lock..."); + let mut rx = accepted_rx.lock().await; + info!("Incoming connection forwarder: rx lock acquired, waiting for addresses..."); + loop { + let addr = tokio::select! { + Some(addr) = rx.recv() => { + info!("Incoming connection forwarder: received address {}", addr); + addr + }, + _ = shutdown.cancelled() => return, + }; + + // Check if already registered + if connected_peers.read().await.contains_key(&addr) { + continue; + } + + info!( + "Incoming connection forwarder: registering {} in connected_peers", + addr + ); + let peer_conn = PeerConnection { + public_key: None, + remote_addr: TransportAddr::Quic(addr), + authenticated: true, + connected_at: Instant::now(), + last_activity: Instant::now(), + }; + connected_peers.write().await.insert(addr, peer_conn); + let _ = event_tx.send(P2pEvent::PeerConnected { + addr: TransportAddr::Quic(addr), + public_key: None, + side: Side::Server, + }); + } + }); + } + // v0.2: authenticate_peer removed - TLS handles peer authentication via ML-DSA-65 } @@ -2565,6 +2906,7 @@ impl Clone for P2pEndpoint { reader_tasks: Arc::clone(&self.reader_tasks), reader_handles: Arc::clone(&self.reader_handles), reader_exit_tx: self.reader_exit_tx.clone(), + pending_dials: Arc::clone(&self.pending_dials), } } } diff --git a/src/shared.rs b/src/shared.rs index 1949cec48..1ad95f210 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -74,13 +74,21 @@ pub(crate) enum EndpointEventInner { /// When `bool == true`, a new connection ID will be issued to peer RetireConnectionId(Instant, u64, bool), /// Request to relay a PunchMeNow frame to a target peer - RelayPunchMeNow([u8; 32], crate::frame::PunchMeNow), + /// Fields: (target_peer_id, coordination_frame, target_remote_address) + RelayPunchMeNow([u8; 32], crate::frame::PunchMeNow, std::net::SocketAddr), /// Request to send an AddAddress frame to the peer #[allow(dead_code)] SendAddressFrame(crate::frame::AddAddress), /// NAT traversal candidate validation succeeded #[allow(dead_code)] NatCandidateValidated { address: SocketAddr, challenge: u64 }, + /// Initiate a hole-punch connection attempt to a peer's address. + /// Emitted by the target node when it receives a relayed PUNCH_ME_NOW, + /// triggering QUIC Initial packets to create a NAT binding. + InitiateHolePunch { + /// The peer's external address to connect to + peer_address: SocketAddr, + }, /// Request to attempt connection to a target address (NAT callback mechanism) TryConnectTo { request_id: crate::VarInt, @@ -232,3 +240,37 @@ pub fn normalize_socket_addr(addr: SocketAddr) -> SocketAddr { SocketAddr::V4(_) => addr, } } + +/// Deterministic 32-byte wire identifier from a `SocketAddr`. +/// +/// Used to correlate PUNCH_ME_NOW relay targets across connections. +/// The encoding is deterministic (no hashing): IP bytes are written directly +/// into a 32-byte array with a version-byte prefix. +/// +/// Layout for IPv4 (`[4, ip0..ip3, port_hi, port_lo, 0..]`): +/// byte 0 = 4 (version tag) +/// bytes 1-4 = IPv4 octets +/// bytes 5-6 = port (big-endian) +/// bytes 7-31 = zero padding +/// +/// Layout for IPv6 (`[6, ip0..ip15, port_hi, port_lo, 0..]`): +/// byte 0 = 6 (version tag) +/// bytes 1-16 = IPv6 octets +/// bytes 17-18 = port (big-endian) +/// bytes 19-31 = zero padding +pub fn wire_id_from_addr(addr: SocketAddr) -> [u8; 32] { + let mut bytes = [0u8; 32]; + match addr { + SocketAddr::V4(v4) => { + bytes[0] = 4; + bytes[1..5].copy_from_slice(&v4.ip().octets()); + bytes[5..7].copy_from_slice(&v4.port().to_be_bytes()); + } + SocketAddr::V6(v6) => { + bytes[0] = 6; + bytes[1..17].copy_from_slice(&v6.ip().octets()); + bytes[17..19].copy_from_slice(&v6.port().to_be_bytes()); + } + } + bytes +}