Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tracing::error;
#[derive(Clone)]
pub struct EthPubSub<Eth> {
/// All nested fields bundled together.
inner: Arc<EthPubSubInner<Eth>>,
pub inner: Arc<EthPubSubInner<Eth>>,
}

// === impl EthPubSub ===
Expand Down Expand Up @@ -86,7 +86,7 @@ where
}

/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Eth>(
pub async fn handle_accepted<Eth>(
pubsub: Arc<EthPubSubInner<Eth>>,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
Expand Down Expand Up @@ -140,7 +140,7 @@ where
};
std::future::ready(tx_value)
});
return pipe_from_stream(accepted_sink, stream).await
return pipe_from_stream(accepted_sink, stream).await;
}
Params::Bool(false) | Params::None => {
// only hashes requested
Expand Down Expand Up @@ -172,7 +172,7 @@ where
.map_err(SubscriptionSerializeError::new)?;

if accepted_sink.send(msg).await.is_err() {
return Ok(())
return Ok(());
}

while canon_state.next().await.is_some() {
Expand All @@ -192,7 +192,7 @@ where
.map_err(SubscriptionSerializeError::new)?;

if accepted_sink.send(msg).await.is_err() {
break
break;
}
}
}
Expand Down Expand Up @@ -263,8 +263,8 @@ impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
}

/// Container type `EthPubSub`
#[derive(Clone)]
struct EthPubSubInner<EthApi> {
#[derive(Clone, Debug)]
pub struct EthPubSubInner<EthApi> {
/// The `eth` API.
eth_api: EthApi,
/// The type that's used to spawn subscription tasks.
Expand All @@ -278,7 +278,7 @@ where
Eth: RpcNodeCore<Provider: BlockNumReader>,
{
/// Returns the current sync status for the `syncing` subscription
fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
pub fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
if is_syncing {
let current_block = self
.eth_api
Expand All @@ -303,12 +303,12 @@ where
Eth: RpcNodeCore<Pool: TransactionPool>,
{
/// Returns a stream that yields all transaction hashes emitted by the txpool.
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
pub fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
}

/// Returns a stream that yields all transactions emitted by the txpool.
fn full_pending_transaction_stream(
pub fn full_pending_transaction_stream(
&self,
) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
self.eth_api.pool().new_pending_pool_transactions_listener()
Expand All @@ -320,7 +320,7 @@ where
Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
{
/// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
pub fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain.committed().headers().collect::<Vec<_>>();
futures::stream::iter(
Expand All @@ -330,7 +330,7 @@ where
}

/// Returns a stream that yields all logs that match the given filter.
fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
pub fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends").block_receipts()
Expand Down
Loading