jetstreamer-source: use firehose StatsTracking for progress logging#263
jetstreamer-source: use firehose StatsTracking for progress logging#263senzenn wants to merge 2 commits into
Conversation
…StatsTracking Register upstream `firehose` `StatsTracking` instead of hand-counting blocks in `process_block`. Periodic progress now reports the engine's own aggregates — slots, blocks, transactions, entries, leader-skipped slots — plus TPS, emitted per worker thread (thread_id logged to disambiguate the global aggregates). - Add `stats_interval_slots` to `JetstreamSourceConfig` (default 10000). `0` disables progress logging and avoids upstream's unguarded `slot % interval`. - Drop the `blocks_processed: AtomicU64` field and its two debug-logging sites. - Expose `--stats-interval-slots` on the jetstreamer example.
| let stats_tracking = (config.stats_interval_slots != 0).then_some( | ||
| jetstreamer_firehose::firehose::StatsTracking { | ||
| on_stats: log_firehose_stats, | ||
| tracking_interval_slots: config.stats_interval_slots, | ||
| }, | ||
| ); |
There was a problem hiding this comment.
| let stats_tracking = (config.stats_interval_slots != 0).then_some( | |
| jetstreamer_firehose::firehose::StatsTracking { | |
| on_stats: log_firehose_stats, | |
| tracking_interval_slots: config.stats_interval_slots, | |
| }, | |
| ); | |
| let stats_tracking = config.stats_interval_slots.map(|tracking_interval_slots| { | |
| jetstreamer_firehose::firehose::StatsTracking { | |
| on_stats: log_firehose_stats, | |
| tracking_interval_slots, | |
| } | |
| }); |
If go as optional can use a map and the None to signal that logging should be disabled.
There was a problem hiding this comment.
.map needs the field to be Option<u64> (it's u64 right now), and the != 0 check is load-bearing — upstream does slot % tracking_interval_slots unguarded ([firehose.rs:1575](https://github.com/rpcpool/yellowstone-vixen/blob/main/crates/jetstreamer-firehose/src/firehose.rs#L1575)), so a 0 has to become None or it panics. A plain .map passes Some(0) straight through and brings the panic back.
Happy to move to Option<u64>, just keeping the guard:
let stats_tracking = config
.stats_interval_slots
.filter(|&n| n != 0)
.map(|tracking_interval_slots| jetstreamer_firehose::firehose::StatsTracking {
on_stats: log_firehose_stats,
tracking_interval_slots,
});Want it opt-out (default Some(10_000)) or opt-in (default None)? I'll fix up the defaults and tests to match.
| thread_id: usize, | ||
| stats: jetstreamer_firehose::firehose::Stats, | ||
| ) -> futures_util::future::BoxFuture<'static, Result<(), SharedError>> { | ||
| async move { |
There was a problem hiding this comment.
Do we have to return a future or can the function be made async and passed as the callback?
There was a problem hiding this comment.
Has to be a boxed future upstream's Handler<Data> trait requires Fn(usize, Data) -> BoxFuture<'static, HandlerResult>, so a bare async fn won't satisfy the bound. .boxed() erases the opaque future into the BoxFuture it expects.
Same shape as the existing on_block/on_tx/on_entry callbacks at lib.rs:663–681. Happy to inline it as a closure next to those if you'd prefer.
Co-authored-by: Kyle Espinola <kyle.s.espinola@gmail.com>
The handler kept its own
blocks_processed: AtomicU64just to log progress every 10k blocks, whilefirehose()already tracks all of this and we were passingstats_tracking: None.stats_interval_slotsconfig field (default 10000, 0 disables)StatsTrackingcallback that logs slots/blocks/transactions/entries/leader-skipped + tps; the callback fires per worker thread sothread_idis included--stats-interval-slotsto the jetstreamer exampleTested against the old-faithful archive (50 slots of epoch 885, interval 10):