From 67cded46b548e88ffcd0be39522e78c0c15b99b4 Mon Sep 17 00:00:00 2001 From: 0xNarumi Date: Mon, 16 Jun 2025 13:33:22 +0900 Subject: [PATCH] pub upstream --- crates/rpc/rpc/src/eth/pubsub.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index b91318d498b..d881849703c 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -35,7 +35,7 @@ use tracing::error; #[derive(Clone)] pub struct EthPubSub { /// All nested fields bundled together. - inner: Arc>, + pub inner: Arc>, } // === impl EthPubSub === @@ -86,7 +86,7 @@ where } /// The actual handler for an accepted [`EthPubSub::subscribe`] call. -async fn handle_accepted( +pub async fn handle_accepted( pubsub: Arc>, accepted_sink: SubscriptionSink, kind: SubscriptionKind, @@ -140,7 +140,7 @@ where }; std::future::ready(tx_value) }); - return pipe_from_stream(accepted_sink, stream).await + return pipe_from_stream(accepted_sink, stream).await; } Params::Bool(false) | Params::None => { // only hashes requested @@ -172,7 +172,7 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - return Ok(()) + return Ok(()); } while canon_state.next().await.is_some() { @@ -192,7 +192,7 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - break + break; } } } @@ -263,8 +263,8 @@ impl std::fmt::Debug for EthPubSub { } /// Container type `EthPubSub` -#[derive(Clone)] -struct EthPubSubInner { +#[derive(Clone, Debug)] +pub struct EthPubSubInner { /// The `eth` API. eth_api: EthApi, /// The type that's used to spawn subscription tasks. @@ -278,7 +278,7 @@ where Eth: RpcNodeCore, { /// Returns the current sync status for the `syncing` subscription - fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus { + pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus { if is_syncing { let current_block = self .eth_api @@ -303,12 +303,12 @@ where Eth: RpcNodeCore, { /// Returns a stream that yields all transaction hashes emitted by the txpool. - fn pending_transaction_hashes_stream(&self) -> impl Stream { + pub fn pending_transaction_hashes_stream(&self) -> impl Stream { ReceiverStream::new(self.eth_api.pool().pending_transactions_listener()) } /// Returns a stream that yields all transactions emitted by the txpool. - fn full_pending_transaction_stream( + pub fn full_pending_transaction_stream( &self, ) -> impl Stream::Transaction>> { self.eth_api.pool().new_pending_pool_transactions_listener() @@ -320,7 +320,7 @@ where Eth: RpcNodeCore>, { /// Returns a stream that yields all new RPC blocks. - fn new_headers_stream(&self) -> impl Stream> { + pub fn new_headers_stream(&self) -> impl Stream> { self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| { let headers = new_chain.committed().headers().collect::>(); futures::stream::iter( @@ -330,7 +330,7 @@ where } /// Returns a stream that yields all logs that match the given filter. - fn log_stream(&self, filter: Filter) -> impl Stream { + pub fn log_stream(&self, filter: Filter) -> impl Stream { BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state()) .map(move |canon_state| { canon_state.expect("new block subscription never ends").block_receipts()