From a7f63fe282278bbf9674997a4ee7d297d5b2dbbb Mon Sep 17 00:00:00 2001 From: "Peter Chen J." <34339487+peter941221@users.noreply.github.com> Date: Sat, 4 Apr 2026 00:03:01 +0800 Subject: [PATCH] fix: recover outbound opens via live secondary handles --- src/protocol/notification/config.rs | 3 +- src/protocol/notification/handle.rs | 65 +++- src/protocol/notification/mod.rs | 22 +- src/protocol/transport_service.rs | 561 ++++++++++++++++++++++++++-- 4 files changed, 595 insertions(+), 56 deletions(-) diff --git a/src/protocol/notification/config.rs b/src/protocol/notification/config.rs index f7ff08f3e..0c93beb28 100644 --- a/src/protocol/notification/config.rs +++ b/src/protocol/notification/config.rs @@ -94,11 +94,12 @@ impl Config { let (command_tx, command_rx) = channel(DEFAULT_CHANNEL_SIZE); let handshake = Arc::new(RwLock::new(handshake)); let handle = NotificationHandle::new( + protocol_name.clone(), + sync_channel_size, event_rx, notif_rx, command_tx, Arc::clone(&handshake), - protocol_name.clone(), ); ( diff --git a/src/protocol/notification/handle.rs b/src/protocol/notification/handle.rs index c631efba6..25e10acca 100644 --- a/src/protocol/notification/handle.rs +++ b/src/protocol/notification/handle.rs @@ -170,6 +170,12 @@ impl NotificationSink { /// Handle allowing the user protocol to interact with the notification protocol. #[derive(Debug)] pub struct NotificationHandle { + /// Protocol name served by this handle. + protocol_name: ProtocolName, + + /// Configured synchronous channel size. + sync_channel_size: usize, + /// RX channel for receiving events from the notification protocol. event_rx: Receiver, @@ -190,21 +196,21 @@ pub struct NotificationHandle { /// Handshake. handshake: Arc>>, - - /// Protocol name. - protocol_name: ProtocolName, } impl NotificationHandle { /// Create new [`NotificationHandle`]. pub(crate) fn new( + protocol_name: ProtocolName, + sync_channel_size: usize, event_rx: Receiver, notif_rx: Receiver<(PeerId, BytesMut)>, command_tx: Sender, handshake: Arc>>, - protocol_name: ProtocolName, ) -> Self { Self { + protocol_name, + sync_channel_size, event_rx, notif_rx, command_tx, @@ -410,16 +416,34 @@ impl NotificationHandle { Err(error) => match error { NotificationError::NoConnection => Err(NotificationError::NoConnection), NotificationError::ChannelClogged => { - let _ = self.clogged.insert(peer).then(|| { - tracing::warn!( + if self.clogged.insert(peer) { + match self.command_tx.try_send(NotificationCommand::ForceClose { peer }) + { + Ok(()) => tracing::warn!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol_name, + sync_channel_size = self.sync_channel_size, + "sync notification channel clogged, queueing force close", + ), + Err(error) => tracing::warn!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol_name, + sync_channel_size = self.sync_channel_size, + ?error, + "sync notification channel clogged, failed to queue force close", + ), + } + } else { + tracing::debug!( target: LOG_TARGET, ?peer, - protocol_name = ?self.protocol_name, - "notification channel clogged, force close connection", + protocol = %self.protocol_name, + sync_channel_size = self.sync_channel_size, + "sync notification channel still clogged, force close already queued", ); - - self.command_tx.try_send(NotificationCommand::ForceClose { peer }) - }); + } Err(NotificationError::ChannelClogged) } @@ -495,7 +519,14 @@ impl Stream for NotificationHandle { } InnerNotificationEvent::NotificationStreamClosed { peer } => { self.peers.remove(&peer); - self.clogged.remove(&peer); + if self.clogged.remove(&peer) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol_name, + "cleared clogged state after notification stream closed", + ); + } return Poll::Ready(Some(NotificationEvent::NotificationStreamClosed { peer, @@ -517,22 +548,24 @@ impl Stream for NotificationHandle { handshake, })); } - InnerNotificationEvent::NotificationStreamOpenFailure { peer, error } => + InnerNotificationEvent::NotificationStreamOpenFailure { peer, error } => { return Poll::Ready(Some( NotificationEvent::NotificationStreamOpenFailure { peer, error }, - )), + )) + } }, } match futures::ready!(self.notif_rx.poll_recv(cx)) { None => return Poll::Ready(None), - Some((peer, notification)) => + Some((peer, notification)) => { if self.peers.contains_key(&peer) { return Poll::Ready(Some(NotificationEvent::NotificationReceived { peer, notification, })); - }, + } + } } } } diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index a139fd71f..25b2f5151 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -1409,8 +1409,9 @@ impl NotificationProtocol { let (tx, rx) = oneshot::channel(); self.pending_validations.push(Box::pin(async move { match rx.await { - Ok(ValidationResult::Accept) => - (peer, ValidationResult::Accept), + Ok(ValidationResult::Accept) => { + (peer, ValidationResult::Accept) + } _ => (peer, ValidationResult::Reject), } })); @@ -1827,7 +1828,22 @@ impl NotificationProtocol { } } NotificationCommand::ForceClose { peer } => { - let _ = self.service.force_close(peer); + tracing::warn!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol, + "processing force close command after notification channel clog", + ); + + if let Err(error) = self.service.force_close(peer) { + tracing::warn!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol, + ?error, + "failed to force close connection after notification channel clog", + ); + } } #[cfg(feature = "fuzz")] NotificationCommand::SendNotification{ .. } => unreachable!() diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index ec06b239a..e39ee3680 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -21,7 +21,10 @@ use crate::{ addresses::PublicAddresses, error::{Error, ImmediateDialError, SubstreamError}, - protocol::{connection::ConnectionHandle, InnerTransportEvent, TransportEvent}, + protocol::{ + connection::{ConnectionHandle, Permit}, + InnerTransportEvent, TransportEvent, + }, transport::{manager::TransportManagerHandle, Endpoint}, types::{protocol::ProtocolName, ConnectionId, SubstreamId}, PeerId, DEFAULT_CHANNEL_SIZE, @@ -63,6 +66,31 @@ struct ConnectionContext { secondary: Option, } +#[derive(Debug)] +enum OpenSubstreamConnection { + Primary { + connection_id: ConnectionId, + permit: Permit, + }, + PromotedSecondary { + stale_primary: ConnectionId, + connection_id: ConnectionId, + permit: Permit, + }, + NoUsableConnection { + primary: ConnectionId, + secondary: Option, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct ConnectionSnapshot { + primary_id: ConnectionId, + primary_active: bool, + secondary_id: Option, + secondary_active: Option, +} + impl ConnectionContext { /// Create new [`ConnectionContext`]. fn new(primary: ConnectionHandle) -> Self { @@ -118,6 +146,69 @@ impl ConnectionContext { "connection doesn't exist, cannot upgrade", ); } + + fn snapshot(&self) -> ConnectionSnapshot { + ConnectionSnapshot { + primary_id: *self.primary.connection_id(), + primary_active: self.primary.is_active(), + secondary_id: self.secondary.as_ref().map(|handle| *handle.connection_id()), + secondary_active: self.secondary.as_ref().map(ConnectionHandle::is_active), + } + } + + /// Pick the best currently usable connection for opening an outbound substream. + /// + /// If the primary handle has gone stale but a live secondary exists, the secondary is promoted + /// before the caller opens the substream. This keeps the transport view aligned with the + /// actually usable connection instead of repeatedly preferring a dead primary handle. + fn select_connection_for_outbound_substream(&mut self) -> OpenSubstreamConnection { + let snapshot = self.snapshot(); + let primary_id = *self.primary.connection_id(); + + if let Some(permit) = self.primary.try_get_permit() { + return OpenSubstreamConnection::Primary { + connection_id: primary_id, + permit, + }; + } + + let Some(secondary) = self.secondary.as_ref() else { + tracing::debug!( + target: LOG_TARGET, + ?snapshot, + "primary handle unusable and no secondary connection is available", + ); + + return OpenSubstreamConnection::NoUsableConnection { + primary: primary_id, + secondary: None, + }; + }; + + let secondary_id = *secondary.connection_id(); + let Some(permit) = secondary.try_get_permit() else { + tracing::debug!( + target: LOG_TARGET, + ?snapshot, + "both primary and secondary handles are currently unusable", + ); + + return OpenSubstreamConnection::NoUsableConnection { + primary: primary_id, + secondary: Some(secondary_id), + }; + }; + + let secondary = + self.secondary.take().expect("secondary handle exists when permit was acquired"); + self.primary = secondary; + + OpenSubstreamConnection::PromotedSecondary { + stale_primary: primary_id, + connection_id: secondary_id, + permit, + } + } } /// Tracks connection keep-alive timeouts. @@ -276,6 +367,12 @@ impl SubstreamKeepAlive { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum OpenSubstreamAttemptKind { + Primary, + PromotedSecondary, +} + /// Provides an interfaces for [`Litep2p`](crate::Litep2p) protocols to interact /// with the underlying transport protocols. #[derive(Debug)] @@ -427,7 +524,7 @@ impl TransportService { ?peer, ?connection_id, protocol = %self.protocol, - current_state = ?self.connections.get(&peer), + current_state = ?self.connections.get(&peer).map(ConnectionContext::snapshot), "on connection closed", ); @@ -459,6 +556,16 @@ impl TransportService { match context.secondary.take() { None => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?connection_id, + protocol = %self.protocol, + matched_role = "primary", + promoted_secondary = false, + "primary connection closed and no replacement exists", + ); + self.connections.remove(&peer); return Some(TransportEvent::ConnectionClosed { peer }); } @@ -468,10 +575,20 @@ impl TransportService { ?peer, ?connection_id, protocol = %self.protocol, + matched_role = "primary", + promoted_secondary = true, + replacement_connection = ?handle.connection_id(), "switch to secondary connection", ); context.primary = handle; + tracing::trace!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol, + new_state = ?context.snapshot(), + "post-close connection state after secondary promotion", + ); return None; } } @@ -484,6 +601,7 @@ impl TransportService { ?peer, ?connection_id, protocol = %self.protocol, + matched_role = "secondary", "secondary connection closed", ); @@ -496,6 +614,7 @@ impl TransportService { ?connection_id, ?connection_state, protocol = %self.protocol, + matched_role = "unknown", "connection closed but it doesn't exist", ); @@ -559,46 +678,128 @@ impl TransportService { /// Call fails if there is no connection open to `peer` or the channel towards /// the connection is clogged. pub fn open_substream(&mut self, peer: PeerId) -> Result { - // always prefer the primary connection - let connection = &mut self - .connections - .get_mut(&peer) - .ok_or(SubstreamError::PeerDoesNotExist(peer))? - .primary; + let substream_id = + SubstreamId::from(self.next_substream_id.fetch_add(1usize, Ordering::Relaxed)); + let mut retried_after_primary_send_failure = false; + + loop { + let (attempt_kind, connection_id, permit) = match self + .connections + .get_mut(&peer) + .ok_or(SubstreamError::PeerDoesNotExist(peer))? + .select_connection_for_outbound_substream() + { + OpenSubstreamConnection::Primary { + connection_id, + permit, + } => (OpenSubstreamAttemptKind::Primary, connection_id, permit), + OpenSubstreamConnection::PromotedSecondary { + stale_primary, + connection_id, + permit, + } => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?stale_primary, + ?connection_id, + protocol = %self.protocol, + "primary handle was stale, promoted secondary before opening substream", + ); - let connection_id = *connection.connection_id(); + self.keep_alive_tracker.on_connection_closed(peer, stale_primary); + ( + OpenSubstreamAttemptKind::PromotedSecondary, + connection_id, + permit, + ) + } + OpenSubstreamConnection::NoUsableConnection { primary, secondary } => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?primary, + ?secondary, + protocol = %self.protocol, + "all known connection handles were stale, waiting for close propagation", + ); - // This permit will be passed on until the substream is reported back to - // [`TransportService`] in [`InnerTransportEvent::SubstreamOpened`] and connection - // upgraded. - let permit = connection.try_get_permit().ok_or(SubstreamError::ConnectionClosed)?; + self.keep_alive_tracker.on_connection_closed(peer, primary); + if let Some(secondary) = secondary { + self.keep_alive_tracker.on_connection_closed(peer, secondary); + } - let substream_id = - SubstreamId::from(self.next_substream_id.fetch_add(1usize, Ordering::Relaxed)); + return Err(SubstreamError::ConnectionClosed); + } + }; - tracing::trace!( - target: LOG_TARGET, - ?peer, - protocol = %self.protocol, - ?substream_id, - ?connection_id, - "open substream", - ); + tracing::trace!( + target: LOG_TARGET, + ?peer, + protocol = %self.protocol, + ?substream_id, + ?connection_id, + ?attempt_kind, + "open substream", + ); - if self.substream_keep_alive == SubstreamKeepAlive::Yes { - self.keep_alive_tracker.substream_activity(peer, connection_id); - connection.try_upgrade(); - } + let open_result = self + .connections + .get_mut(&peer) + .expect("peer context must exist while opening substream") + .primary + .open_substream( + self.protocol.clone(), + self.fallback_names.clone(), + substream_id, + permit, + self.substream_keep_alive, + ); - connection - .open_substream( - self.protocol.clone(), - self.fallback_names.clone(), - substream_id, - permit, - self.substream_keep_alive, - ) - .map(|_| substream_id) + match open_result { + Ok(()) => { + let connection = &mut self + .connections + .get_mut(&peer) + .expect("peer context must exist after successful open") + .primary; + + if self.substream_keep_alive == SubstreamKeepAlive::Yes { + self.keep_alive_tracker.substream_activity(peer, connection_id); + connection.try_upgrade(); + } + + return Ok(substream_id); + } + Err(SubstreamError::ConnectionClosed) + if attempt_kind == OpenSubstreamAttemptKind::Primary + && !retried_after_primary_send_failure => + { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?connection_id, + protocol = %self.protocol, + "primary send path closed during outbound open, retrying with a fresh connection selection", + ); + + self.keep_alive_tracker.on_connection_closed(peer, connection_id); + self.connections + .get_mut(&peer) + .expect("peer context must exist after send failure") + .primary + .close(); + + retried_after_primary_send_failure = true; + continue; + } + Err(SubstreamError::ConnectionClosed) => { + self.keep_alive_tracker.on_connection_closed(peer, connection_id); + return Err(SubstreamError::ConnectionClosed); + } + Err(error) => return Err(error), + } + } } /// Forcibly close the connection, even if other protocols have substreams open over it. @@ -769,6 +970,230 @@ mod tests { (service, sender, cmd_rx) } + /// Deterministic two-connection harness for outbound-open races. + /// + /// The primary connection is created as a dialer connection and the secondary connection as a + /// listener connection so the test names can describe the transport state transitions directly + /// without repeating the full setup each time. + struct TwoConnectionHarness { + service: TransportService, + sender: Sender, + peer: PeerId, + primary_connection_id: ConnectionId, + secondary_connection_id: ConnectionId, + primary_commands: Option>, + secondary_commands: Receiver, + } + + impl TwoConnectionHarness { + async fn new() -> Self { + let (mut service, sender, _) = transport_service(); + let peer = PeerId::random(); + let primary_connection_id = ConnectionId::from(0usize); + let secondary_connection_id = ConnectionId::from(1usize); + + let primary_commands = Some( + Self::establish_connection( + &mut service, + &sender, + peer, + primary_connection_id, + Endpoint::dialer(Multiaddr::empty(), primary_connection_id), + true, + ) + .await, + ); + let secondary_commands = Self::establish_connection( + &mut service, + &sender, + peer, + secondary_connection_id, + Endpoint::listener(Multiaddr::empty(), secondary_connection_id), + false, + ) + .await; + + Self { + service, + sender, + peer, + primary_connection_id, + secondary_connection_id, + primary_commands, + secondary_commands, + } + } + + async fn establish_connection( + service: &mut TransportService, + sender: &Sender, + peer: PeerId, + connection_id: ConnectionId, + endpoint: Endpoint, + expect_connection_established: bool, + ) -> Receiver { + let expected_address = endpoint.address().clone(); + let (cmd_tx, cmd_rx) = channel(64); + + sender + .send(InnerTransportEvent::ConnectionEstablished { + peer, + connection: connection_id, + endpoint, + sender: ConnectionHandle::new(connection_id, cmd_tx), + }) + .await + .unwrap(); + + if expect_connection_established { + if let Some(TransportEvent::ConnectionEstablished { + peer: connected_peer, + endpoint, + }) = service.next().await + { + assert_eq!(connected_peer, peer); + assert_eq!(endpoint.address(), &expected_address); + } else { + panic!("expected event from `TransportService`"); + }; + } else { + poll_service(service).await; + } + + cmd_rx + } + + fn primary_connection_id(&self) -> ConnectionId { + self.primary_connection_id + } + + fn secondary_connection_id(&self) -> ConnectionId { + self.secondary_connection_id + } + + fn context(&self) -> &ConnectionContext { + self.service.connections.get(&self.peer).expect("peer to exist in harness") + } + + fn downgrade_primary(&mut self) { + self.service + .connections + .get_mut(&self.peer) + .expect("peer to exist in harness") + .downgrade(&self.primary_connection_id); + } + + fn downgrade_secondary(&mut self) { + self.service + .connections + .get_mut(&self.peer) + .expect("peer to exist in harness") + .downgrade(&self.secondary_connection_id); + } + + fn drop_primary_command_receiver(&mut self) { + self.primary_commands.take(); + } + + fn assert_primary_handle_is_stale(&self) { + assert!(self.context().primary.try_get_permit().is_none()); + } + + fn assert_connection_layout( + &self, + primary_connection_id: ConnectionId, + secondary_connection_id: Option, + ) { + let context = self.context(); + + assert_eq!(context.primary.connection_id(), &primary_connection_id); + assert_eq!( + context.secondary.as_ref().map(|handle| *handle.connection_id()), + secondary_connection_id + ); + } + + fn assert_keep_alive_tracks(&self, tracked_connections: &[ConnectionId]) { + assert_eq!( + self.service.keep_alive_tracker.last_activity.len(), + tracked_connections.len() + ); + + for connection_id in tracked_connections { + assert!(self + .service + .keep_alive_tracker + .last_activity + .contains_key(&(self.peer, *connection_id))); + } + } + + fn assert_keep_alive_does_not_track(&self, connection_id: ConnectionId) { + assert!(!self + .service + .keep_alive_tracker + .last_activity + .contains_key(&(self.peer, connection_id))); + } + + fn assert_no_keep_alive(&self) { + assert!(self.service.keep_alive_tracker.last_activity.is_empty()); + } + + fn assert_no_primary_command(&mut self) { + if let Some(primary_commands) = self.primary_commands.as_mut() { + assert!(primary_commands.try_recv().is_err()); + } + } + + fn assert_no_secondary_command(&mut self) { + assert!(self.secondary_commands.try_recv().is_err()); + } + + fn assert_peer_removed(&self) { + assert!(self.service.connections.get(&self.peer).is_none()); + } + + fn open_substream(&mut self) -> Result { + self.service.open_substream(self.peer) + } + + async fn close_connection(&mut self, connection_id: ConnectionId) { + self.sender + .send(InnerTransportEvent::ConnectionClosed { + peer: self.peer, + connection: connection_id, + }) + .await + .unwrap(); + } + + async fn expect_peer_closed_event(&mut self) { + if let Some(TransportEvent::ConnectionClosed { + peer: disconnected_peer, + }) = self.service.next().await + { + assert_eq!(disconnected_peer, self.peer); + } else { + panic!("expected event from `TransportService`"); + }; + } + + async fn expect_secondary_open_substream(&mut self, expected_substream_id: SubstreamId) { + match self.secondary_commands.recv().await.expect("secondary command") { + ProtocolCommand::OpenSubstream { + connection_id, + substream_id, + .. + } => { + assert_eq!(connection_id, self.secondary_connection_id); + assert_eq!(substream_id, expected_substream_id); + } + command => panic!("expected open substream command, got {command:?}"), + } + } + } + #[tokio::test] async fn secondary_connection_stored() { let (mut service, sender, _) = transport_service(); @@ -1080,6 +1505,70 @@ mod tests { assert!(cmd_rx2.try_recv().is_err()); } + #[tokio::test] + async fn open_substream_promotes_live_secondary_when_primary_handle_is_stale() { + let mut harness = TwoConnectionHarness::new().await; + + harness.downgrade_primary(); + harness.assert_primary_handle_is_stale(); + harness.assert_connection_layout( + harness.primary_connection_id(), + Some(harness.secondary_connection_id()), + ); + + let substream_id = harness.open_substream().expect("secondary to be promoted"); + + harness.assert_connection_layout(harness.secondary_connection_id(), None); + harness.assert_keep_alive_tracks(&[harness.secondary_connection_id()]); + harness.expect_secondary_open_substream(substream_id).await; + harness.assert_no_primary_command(); + } + + #[tokio::test] + async fn open_substream_keeps_peer_until_real_close_when_primary_and_secondary_are_stale() { + let mut harness = TwoConnectionHarness::new().await; + + harness.downgrade_primary(); + harness.downgrade_secondary(); + + assert_eq!( + harness.open_substream(), + Err(SubstreamError::ConnectionClosed) + ); + harness.assert_connection_layout( + harness.primary_connection_id(), + Some(harness.secondary_connection_id()), + ); + harness.assert_no_keep_alive(); + harness.assert_no_primary_command(); + harness.assert_no_secondary_command(); + + harness.close_connection(harness.primary_connection_id()).await; + poll_service(&mut harness.service).await; + + harness.close_connection(harness.secondary_connection_id()).await; + harness.expect_peer_closed_event().await; + harness.assert_peer_removed(); + } + + #[tokio::test] + async fn open_substream_retries_live_secondary_when_primary_send_fails_closed() { + let mut harness = TwoConnectionHarness::new().await; + + // Simulate the race where `try_get_permit()` still succeeds on the active sender but + // the underlying connection task is already gone by the time the command is sent. + harness.drop_primary_command_receiver(); + + let substream_id = harness + .open_substream() + .expect("secondary should recover after primary send failure"); + + harness.assert_connection_layout(harness.secondary_connection_id(), None); + harness.assert_keep_alive_tracks(&[harness.secondary_connection_id()]); + harness.assert_keep_alive_does_not_track(harness.primary_connection_id()); + harness.expect_secondary_open_substream(substream_id).await; + } + #[tokio::test] async fn keep_alive_timeout_expires_for_a_stale_connection() { let _ = tracing_subscriber::fmt()