Skip to content

Commit dd44078

Browse files
authored
fix: handle pagination when fetching bundles and txs (#239)
* handle pagination when fetching bundles and txs * version bump to rc 6
1 parent f1b6c4b commit dd44078

5 files changed

Lines changed: 46 additions & 54 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "builder"
3-
version = "1.0.0-rc.5"
3+
version = "1.0.0-rc.6"
44
description = "signet builder example"
55

66
edition = "2024"
@@ -52,7 +52,6 @@ eyre = "0.6.12"
5252
futures-util = "0.3.31"
5353
openssl = { version = "0.10", features = ["vendored"] }
5454
reqwest = { version = "0.12.22", features = ["blocking", "json"] }
55-
serde = { version = "1.0.197", features = ["derive"] }
5655
thiserror = "2.0.17"
5756
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
5857
tokio-stream = "0.1.17"

src/tasks/cache/bundle.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
//! Bundler service responsible for fetching bundles and sending them to the simulator.
22
use crate::config::BuilderConfig;
33
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
4-
use signet_tx_cache::{TxCacheError, types::CachedBundle};
4+
use signet_tx_cache::{
5+
TxCacheError,
6+
types::{BundleKey, CachedBundle},
7+
};
58
use tokio::{
69
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
710
task::JoinHandle,
811
time::{self, Duration},
912
};
10-
use tracing::{Instrument, error, trace, trace_span};
13+
use tracing::{Instrument, trace, trace_span, warn};
1114

1215
/// Poll interval for the bundle poller in milliseconds.
1316
const POLL_INTERVAL_MS: u64 = 1000;
@@ -50,25 +53,33 @@ impl BundlePoller {
5053
Duration::from_millis(self.poll_interval_ms)
5154
}
5255

53-
/// Checks the bundle cache for new bundles.
56+
/// Fetches all bundles from the tx-cache, paginating through all available pages.
5457
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
55-
let res = self.tx_cache.get_bundles(None).await;
58+
let mut all_bundles = Vec::new();
59+
let mut cursor: Option<BundleKey> = None;
5660

57-
match res {
58-
Ok(resp) => {
59-
let bundles = resp.into_inner();
60-
trace!(count = ?bundles.bundles.len(), "found bundles");
61-
Ok(bundles.bundles)
62-
}
63-
Err(err) => {
64-
if matches!(&err, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
65-
trace!("Not our slot to fetch bundles");
66-
} else {
67-
error!(?err, "Failed to fetch bundles from tx-cache");
61+
loop {
62+
let resp = match self.tx_cache.get_bundles(cursor).await {
63+
Ok(resp) => resp,
64+
Err(error) => {
65+
if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
66+
trace!("Not our slot to fetch bundles");
67+
} else {
68+
warn!(%error, "Failed to fetch bundles from tx-cache");
69+
}
70+
return Err(error);
6871
}
69-
Err(err)
70-
}
72+
};
73+
74+
let (bundle_list, next_cursor) = resp.into_parts();
75+
all_bundles.extend(bundle_list.bundles);
76+
77+
let Some(next) = next_cursor else { break };
78+
cursor = Some(next);
7179
}
80+
81+
trace!(count = all_bundles.len(), "fetched all bundles from tx-cache");
82+
Ok(all_bundles)
7283
}
7384

7485
async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {

src/tasks/cache/tx.rs

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,26 @@
1-
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
1+
//! Transaction service responsible for fetching and sending transactions to the simulator.
22
use crate::config::BuilderConfig;
33
use alloy::{
44
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
55
providers::Provider,
66
};
7-
use eyre::Error;
8-
use reqwest::{Client, Url};
9-
use serde::{Deserialize, Serialize};
7+
use futures_util::TryStreamExt;
8+
use signet_tx_cache::{TxCache, TxCacheError};
109
use std::time::Duration;
1110
use tokio::{sync::mpsc, task::JoinHandle, time};
1211
use tracing::{Instrument, debug, debug_span, trace, trace_span};
1312

1413
/// Poll interval for the transaction poller in milliseconds.
1514
const POLL_INTERVAL_MS: u64 = 1000;
1615

17-
/// Models a response from the transaction pool.
18-
#[derive(Debug, Clone, Serialize, Deserialize)]
19-
struct TxPoolResponse {
20-
/// Holds the transactions property as a list on the response.
21-
transactions: Vec<TxEnvelope>,
22-
}
23-
2416
/// Implements a poller for the block builder to pull transactions from the
2517
/// transaction pool.
2618
#[derive(Debug, Clone)]
2719
pub struct TxPoller {
2820
/// Config values from the Builder.
2921
config: &'static BuilderConfig,
30-
/// Reqwest Client for fetching transactions from the cache.
31-
client: Client,
22+
/// Client for the tx cache.
23+
tx_cache: TxCache,
3224
/// Defines the interval at which the service should poll the cache.
3325
poll_interval_ms: u64,
3426
}
@@ -51,7 +43,8 @@ impl TxPoller {
5143
/// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
5244
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
5345
let config = crate::config();
54-
Self { config, client: Client::new(), poll_interval_ms }
46+
let tx_cache = TxCache::new(config.tx_pool_url.clone());
47+
Self { config, tx_cache, poll_interval_ms }
5548
}
5649

5750
/// Returns the poll duration as a [`Duration`].
@@ -98,21 +91,12 @@ impl TxPoller {
9891
});
9992
}
10093

101-
/// Polls the transaction cache for transactions.
102-
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
103-
let url: Url = self.config.tx_pool_url.join("transactions")?;
104-
self.client
105-
.get(url)
106-
.send()
107-
.await?
108-
.error_for_status()?
109-
.json()
110-
.await
111-
.map(|resp: TxPoolResponse| resp.transactions)
112-
.map_err(Into::into)
94+
/// Polls the transaction cache for transactions, paginating through all available pages.
95+
pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
96+
self.tx_cache.stream_transactions().try_collect().await
11397
}
11498

115-
async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
99+
async fn task_future(self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
116100
loop {
117101
let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url);
118102

@@ -124,12 +108,12 @@ impl TxPoller {
124108
}
125109

126110
if let Ok(transactions) =
127-
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| {
128-
debug!(%err, "Error fetching transactions");
111+
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| {
112+
debug!(%error, "Error fetching transactions");
129113
})
130114
{
131115
let _guard = span.entered();
132-
trace!(count = ?transactions.len(), "found transactions");
116+
trace!(count = transactions.len(), "found transactions");
133117
for tx in transactions.into_iter() {
134118
self.spawn_check_nonce(tx, outbound.clone());
135119
}

tests/tx_poller_test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use builder::{
33
tasks::cache::TxPoller,
44
test_utils::{new_signed_tx, setup_logging, setup_test_config},
55
};
6-
// Import the refactored function
76
use eyre::{Ok, Result};
87

98
#[ignore = "integration test"]
@@ -16,9 +15,9 @@ async fn test_tx_roundtrip() -> Result<()> {
1615
post_tx().await?;
1716

1817
// Create a new poller
19-
let mut poller = TxPoller::new();
18+
let poller = TxPoller::new();
2019

21-
// Fetch transactions the pool
20+
// Fetch transactions from the pool
2221
let transactions = poller.check_tx_cache().await?;
2322

2423
// Ensure at least one transaction exists

0 commit comments

Comments
 (0)