diff --git a/bin/network-monitor/assets/index.css b/bin/network-monitor/assets/index.css index 30275f5b2..69947fd7d 100644 --- a/bin/network-monitor/assets/index.css +++ b/bin/network-monitor/assets/index.css @@ -297,6 +297,7 @@ body { justify-content: center; transition: all 0.2s; color: var(--color-text-faint); + flex-shrink: 0; } .copy-button:hover { @@ -429,12 +430,16 @@ body { /* When the value carries an inline action (currently only the copy button), keep them on the same line. Without this `.metric-value` is inline, and the whitespace between the value text - and the button is a wrap opportunity — so long values (URLs in particular) push the button - onto its own row below. */ + and the button is a wrap opportunity, so long values (URLs in particular) push the button + onto its own row below. `min-width: 0` plus `overflow-wrap: anywhere` let values with no + natural break points (e.g. URLs without hyphens) wrap inside the card instead of overflowing + it and dragging the button outside the box. */ .metric-value:has(.copy-button) { display: inline-flex; align-items: center; gap: 4px; + min-width: 0; + overflow-wrap: anywhere; } .metric-value.warning-delta, diff --git a/bin/network-monitor/src/commands/start.rs b/bin/network-monitor/src/commands/start.rs index e5fbb0b0c..29e89c5e1 100644 --- a/bin/network-monitor/src/commands/start.rs +++ b/bin/network-monitor/src/commands/start.rs @@ -35,11 +35,7 @@ pub async fn start_monitor(config: MonitorConfig) -> Result<()> { let rpc_rx = tasks.spawn_rpc_checker(&config); - let prover_rxs = if config.remote_prover_urls.is_empty() { - Vec::new() - } else { - tasks.spawn_prover_tasks(&config).await - }; + let prover_rxs = tasks.spawn_prover_tasks(&config); let faucet_rx = config.faucet_url.is_some().then(|| tasks.spawn_faucet(&config)); @@ -48,7 +44,7 @@ pub async fn start_monitor(config: MonitorConfig) -> Result<()> { let (ntx_increment_rx, ntx_tracking_rx) = if config.disable_ntx_service { (None, None) } else { - let (increment_rx, tracking_rx) = tasks.spawn_ntx_service(&config).await?; + let (increment_rx, tracking_rx) = tasks.spawn_ntx_service(&config); (Some(increment_rx), Some(tracking_rx)) }; diff --git a/bin/network-monitor/src/counter.rs b/bin/network-monitor/src/counter.rs index 4c4fffe65..607203bcd 100644 --- a/bin/network-monitor/src/counter.rs +++ b/bin/network-monitor/src/counter.rs @@ -149,6 +149,10 @@ pub struct IncrementService { } impl IncrementService { + /// Display name of the service, shared with the bootstrap seeding code in + /// [`crate::monitor::tasks`]. + pub const NAME: &'static str = "Local Transactions"; + pub async fn new( config: MonitorConfig, wallet_account: Account, @@ -330,7 +334,7 @@ impl IncrementService { impl Service for IncrementService { fn name(&self) -> &'static str { - "Local Transactions" + Self::NAME } fn interval(&self) -> Duration { @@ -413,6 +417,10 @@ pub struct CounterTrackingService { } impl CounterTrackingService { + /// Display name of the service, shared with the bootstrap seeding code in + /// [`crate::monitor::tasks`]. + pub const NAME: &'static str = "Network Transactions"; + pub async fn new( config: MonitorConfig, counter_receiver: watch::Receiver, @@ -552,7 +560,7 @@ impl CounterTrackingService { impl Service for CounterTrackingService { fn name(&self) -> &'static str { - "Network Transactions" + Self::NAME } fn interval(&self) -> Duration { @@ -664,15 +672,15 @@ fn build_increment_status(details: &IncrementDetails, last_error: Option let service_details = ServiceDetails::NtxIncrement(details.clone()); if let Some(err) = last_error { - ServiceStatus::unhealthy("Local Transactions", err, service_details) + ServiceStatus::unhealthy(IncrementService::NAME, err, service_details) } else if details.success_count == 0 && details.failure_count > 0 { ServiceStatus::unhealthy( - "Local Transactions", + IncrementService::NAME, format!("no successful increments ({} failures)", details.failure_count), service_details, ) } else { - ServiceStatus::healthy("Local Transactions", service_details) + ServiceStatus::healthy(IncrementService::NAME, service_details) } } @@ -694,7 +702,7 @@ fn build_tracking_status( let service_details = ServiceDetails::NtxTracking(details.clone()); if let Some(err) = last_error { - return ServiceStatus::unhealthy("Network Transactions", err, service_details); + return ServiceStatus::unhealthy(CounterTrackingService::NAME, err, service_details); } if over_threshold_streak >= PENDING_UNHEALTHY_CONFIRMATION_POLLS { @@ -703,13 +711,13 @@ fn build_tracking_status( "counter trailing expected by {pending} (> {threshold}) for {over_threshold_streak} \ consecutive polls", ); - return ServiceStatus::unhealthy("Network Transactions", err, service_details); + return ServiceStatus::unhealthy(CounterTrackingService::NAME, err, service_details); } if details.current_value.is_some() { - ServiceStatus::healthy("Network Transactions", service_details) + ServiceStatus::healthy(CounterTrackingService::NAME, service_details) } else { - ServiceStatus::unknown("Network Transactions", service_details) + ServiceStatus::unknown(CounterTrackingService::NAME, service_details) } } diff --git a/bin/network-monitor/src/deploy/mod.rs b/bin/network-monitor/src/deploy/mod.rs index 0033d5b10..a1f819936 100644 --- a/bin/network-monitor/src/deploy/mod.rs +++ b/bin/network-monitor/src/deploy/mod.rs @@ -50,9 +50,10 @@ pub mod wallet; /// /// At startup the monitor may come up before the node's RPC endpoint is accepting connections, so /// the eager `connect()` (and the follow-up `get_block_header_by_number` request) is retried with -/// exponential backoff instead of aborting the binary on the first refused connection. The schedule -/// is bounded so a genuinely unreachable or misconfigured endpoint still surfaces as a fatal error -/// rather than hanging forever. +/// exponential backoff instead of failing on the first refused connection. The schedule is bounded +/// so a single handshake attempt returns within a few minutes; callers that must survive a +/// genuinely unreachable endpoint (e.g. the NTX bootstrap in `monitor::tasks`) wrap it in their +/// own unbounded retry loop. const GENESIS_DISCOVERY_BACKOFF_INITIAL: Duration = Duration::from_secs(1); const GENESIS_DISCOVERY_BACKOFF_MAX: Duration = Duration::from_secs(30); const GENESIS_DISCOVERY_MAX_RETRIES: usize = 10; diff --git a/bin/network-monitor/src/faucet.rs b/bin/network-monitor/src/faucet.rs index 8e08a6a58..2cf28fc10 100644 --- a/bin/network-monitor/src/faucet.rs +++ b/bin/network-monitor/src/faucet.rs @@ -3,10 +3,11 @@ //! This module contains the logic for periodically testing faucet functionality //! by requesting proof-of-work challenges, solving them, and submitting token requests. -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Context; use hex; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use reqwest::Client; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -88,6 +89,8 @@ pub struct FaucetService { url: Url, client: Client, interval: Duration, + /// Wall-clock cap on solving a single `PoW` challenge. + solve_timeout: Duration, /// A valid public account ID used as the recipient for faucet token requests. Generated once at /// construction from a throwaway wallet account; the minted tokens are never spent. account_id: String, @@ -109,6 +112,7 @@ impl FaucetService { url, client, interval, + solve_timeout: request_timeout, account_id: wallet_account.id().to_string(), success_count: 0, failure_count: 0, @@ -145,7 +149,9 @@ impl Service for FaucetService { let start_time = std::time::Instant::now(); let mut last_error: Option = None; - match perform_faucet_test(&self.client, &self.url, &self.account_id).await { + match perform_faucet_test(&self.client, &self.url, &self.account_id, self.solve_timeout) + .await + { Ok((minted_tokens, metadata)) => { self.success_count += 1; self.last_tx_id = Some(minted_tokens.tx_id.clone()); @@ -198,6 +204,7 @@ async fn perform_faucet_test( client: &Client, faucet_url: &Url, account_id: &str, + solve_timeout: Duration, ) -> anyhow::Result<(GetTokensResponse, GetMetadataResponse)> { debug!("Using recipient account ID: {} (length: {})", account_id, account_id.len()); @@ -210,7 +217,7 @@ async fn perform_faucet_test( let response = client.get(pow_url).send().await?; - let response_text: String = response.text().await?; + let response_text = read_success_body(response).await.context("/pow request failed")?; debug!("Faucet PoW response: {}", response_text); let challenge_response: PowChallengeResponse = @@ -222,9 +229,16 @@ async fn perform_faucet_test( &challenge_response.challenge[..16.min(challenge_response.challenge.len())] ); - // Step 2: Solve the PoW challenge - let nonce = solve_pow_challenge(&challenge_response.challenge, challenge_response.target) - .context("Failed to solve PoW challenge")?; + // Step 2: Solve the PoW challenge off the async runtime; hashing is CPU-bound and would + // otherwise stall every other checker task scheduled on this worker thread. + let challenge = challenge_response.challenge.clone(); + let target = challenge_response.target; + let nonce = spawn_blocking_in_current_span(move || { + solve_pow_challenge(&challenge, target, solve_timeout) + }) + .await + .context("PoW solver task panicked")? + .context("Failed to solve PoW challenge")?; debug!("Solved PoW challenge with nonce: {}", nonce); @@ -240,7 +254,7 @@ async fn perform_faucet_test( let response = client.get(tokens_url).send().await?; - let response_text: String = response.text().await?; + let response_text = read_success_body(response).await.context("/get_tokens request failed")?; debug!("Faucet /get_tokens response: {}", response_text); let tokens_response: GetTokensResponse = @@ -251,7 +265,8 @@ async fn perform_faucet_test( let response = client.get(metadata_url).send().await?; - let response_text = response.text().await?; + let response_text = + read_success_body(response).await.context("/get_metadata request failed")?; debug!("Faucet /get_metadata response: {}", response_text); let metadata: GetMetadataResponse = @@ -260,6 +275,16 @@ async fn perform_faucet_test( Ok((tokens_response, metadata)) } +/// Reads the response body, failing with the HTTP status code and body when the request was not +/// successful, so server-side errors (e.g. 429 or 500) surface directly on the card instead of as a +/// deserialization failure. +async fn read_success_body(response: reqwest::Response) -> anyhow::Result { + let status = response.status(); + let body = response.text().await?; + anyhow::ensure!(status.is_success(), "HTTP {status}: {body}"); + Ok(body) +} + /// Deserialize a faucet response using [`serde_path_to_error`] so that the failing JSON path (e.g. /// `max_supply`, `explorer_url`) is included in the error message. Combined with /// `#[serde(deny_unknown_fields)]` on each response type, this means renamed, removed, or newly @@ -274,15 +299,19 @@ where /// Solves a proof-of-work challenge using SHA-256 hashing. /// +/// This is CPU-bound and must run on a blocking thread (see the `spawn_blocking` call site). +/// /// # Arguments /// /// * `challenge` - The challenge string in hexadecimal format. /// * `target` - The target value. A solution is valid if H(challenge, nonce) < target. +/// * `timeout` - Wall-clock cap; checked every 100k attempts so a pathological difficulty cannot +/// pin the blocking thread indefinitely. /// /// # Returns /// -/// The nonce that solves the challenge, or an error if no solution is found within reasonable -/// bounds. +/// The nonce that solves the challenge, or an error if no solution is found within the attempt +/// and time bounds. #[instrument( parent = None, target = COMPONENT, @@ -292,8 +321,9 @@ where ret(level = "debug"), err )] -fn solve_pow_challenge(challenge: &str, target: u64) -> anyhow::Result { +fn solve_pow_challenge(challenge: &str, target: u64, timeout: Duration) -> anyhow::Result { let challenge_bytes = hex::decode(challenge).context("Failed to decode challenge from hex")?; + let started = Instant::now(); // Try up to 100 million nonces. for nonce in 0..MAX_CHALLENGE_ATTEMPTS { @@ -316,8 +346,15 @@ fn solve_pow_challenge(challenge: &str, target: u64) -> anyhow::Result { return Ok(nonce); } - // Log progress every 100k attempts + // Check the deadline and log progress every 100k attempts if nonce % 100_000 == 0 && nonce > 0 { + let elapsed = started.elapsed(); + if elapsed >= timeout { + anyhow::bail!( + "Failed to solve PoW challenge within {timeout:?} ({nonce} attempts, target \ + {target})" + ); + } debug!( "PoW attempt {}: current_hash={}, target={} (~{} bits)", nonce, diff --git a/bin/network-monitor/src/monitor/tasks.rs b/bin/network-monitor/src/monitor/tasks.rs index 8374264a3..debf83f1d 100644 --- a/bin/network-monitor/src/monitor/tasks.rs +++ b/bin/network-monitor/src/monitor/tasks.rs @@ -2,8 +2,10 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; +use std::time::Duration; use anyhow::Result; +use backon::{ExponentialBuilder, Retryable}; use miden_node_proto::clients::RemoteProverClient; use miden_node_utils::tasks::Tasks as SupervisedTasks; use tokio::sync::watch::Receiver; @@ -18,9 +20,15 @@ use crate::explorer::ExplorerService; use crate::faucet::FaucetService; use crate::frontend::{ServerState, serve}; use crate::note_transport::NoteTransportService; -use crate::remote_prover::{ProbeSnapshot, ProverStatusService, generate_prover_test_payload}; +use crate::remote_prover::ProverStatusService; use crate::service::{Service, build_tls_client}; -use crate::status::{RpcService, ServiceStatus}; +use crate::status::{ + CounterTrackingDetails, + IncrementDetails, + RpcService, + ServiceDetails, + ServiceStatus, +}; use crate::validator::ValidatorService; /// Task management structure that supervises named component tasks. @@ -86,44 +94,24 @@ impl Tasks { /// Spawn prover status tasks for all configured provers. /// /// Each prover is monitored by a [`ProverStatusService`] that polls on the status cadence. - /// The first time it observes the prover reporting `ProofType::Transaction`, the status - /// service spawns a detached probe task that runs proof-test probes on the test cadence. - pub async fn spawn_prover_tasks( - &mut self, - config: &MonitorConfig, - ) -> Vec> { - // Build the proof-test payload once and share it across all provers. If it can't be built - // (e.g. RPC unreachable at startup), prover status is still monitored without proof - // probing. - let payload = match generate_prover_test_payload(&config.rpc_url).await { - Ok(payload) => Some(payload), - Err(e) => { - warn!( - target: COMPONENT, - error = %e, - "failed to build remote-prover probe payload" - ); - None - }, - }; - + /// Once it observes the prover reporting `ProofType::Transaction`, the status service spawns + /// (and keeps alive) a probe task that acquires its test payload from the RPC and runs + /// proof-test probes on the test cadence. + pub fn spawn_prover_tasks(&mut self, config: &MonitorConfig) -> Vec> { let mut prover_rxs = Vec::new(); for (i, prover_url) in config.remote_prover_urls.iter().enumerate() { let name = format!("Remote Prover ({})", i + 1); - let (probe_tx, probe_rx) = watch::channel(ProbeSnapshot::default()); let test_client = build_tls_client::(prover_url.clone(), config.request_timeout); let status_svc = ProverStatusService::new( name, prover_url.clone(), + config.rpc_url.clone(), config.status_check_interval, config.request_timeout, config.remote_prover_test_interval, - probe_tx, - probe_rx, test_client, - payload.clone(), ); prover_rxs.push(self.spawn_service(status_svc)); } @@ -140,44 +128,28 @@ impl Tasks { /// Spawn the network transaction service checker task. /// - /// Creates a fresh wallet/counter pair in memory, deploys the counter to the network, and - /// hands the same counter account to both services via a [`watch::channel`]. The increment - /// service publishes new counters on the channel when it regenerates accounts after - /// persistent failures; the tracking service observes the channel to switch over. - pub async fn spawn_ntx_service( + /// Returns the two status receivers immediately, seeded with an unknown "deploying monitor + /// accounts" status, and bootstraps the services in a supervised background task so that a + /// slow or unreachable RPC neither delays the dashboard nor aborts the monitor (see + /// [`run_ntx`]). + pub fn spawn_ntx_service( &mut self, config: &MonitorConfig, - ) -> Result<(Receiver, Receiver)> { - let (wallet_account, secret_key, counter_account) = - create_and_deploy_accounts(&config.rpc_url).await?; - - let (counter_tx, counter_rx) = watch::channel(counter_account.clone()); - - let expected_counter_value = Arc::new(AtomicU64::new(0)); - let latency_state = Arc::new(Mutex::new(LatencyState::default())); - - let increment_svc = IncrementService::new( - config.clone(), - wallet_account, - secret_key, - counter_account, - counter_tx, - Arc::clone(&expected_counter_value), - latency_state.clone(), - ) - .await?; - let tracking_svc = CounterTrackingService::new( - config.clone(), - counter_rx, - Arc::clone(&expected_counter_value), - latency_state, - ) - .await?; - - let increment_rx = self.spawn_service(increment_svc); - let tracking_rx = self.spawn_service(tracking_svc); - - Ok((increment_rx, tracking_rx)) + ) -> (Receiver, Receiver) { + let (increment_tx, increment_rx) = watch::channel(ntx_seed_status( + IncrementService::NAME, + ServiceDetails::NtxIncrement(IncrementDetails::default()), + )); + let (tracking_tx, tracking_rx) = watch::channel(ntx_seed_status( + CounterTrackingService::NAME, + ServiceDetails::NtxTracking(CounterTrackingDetails::default()), + )); + + let config = config.clone(); + self.handles.spawn_infallible("ntx", run_ntx(config, increment_tx, tracking_tx)); + debug!(target: COMPONENT, service = "ntx", "spawned service"); + + (increment_rx, tracking_rx) } /// Spawns a [`Service`] and returns its `ServiceStatus` receiver. @@ -208,3 +180,103 @@ impl Tasks { self.handles.join_next_as_error().await } } + +// NTX BOOTSTRAP +// ================================================================================================ + +/// Seed status published on the NTX channels until the accounts are deployed. +fn ntx_seed_status(name: &str, details: ServiceDetails) -> ServiceStatus { + let mut status = ServiceStatus::unknown(name, details); + status.error = Some("deploying monitor accounts".to_string()); + status +} + +/// Bootstraps the network transaction services and runs them. +/// +/// Deployment is retried forever with exponential backoff, publishing an unhealthy status on both +/// channels after each failed attempt, so a network that is down at startup degrades the cards +/// instead of aborting the monitor. Once bootstrapped, both services run on separate tasks; if +/// either exits or panics, this supervised task ends and [`Tasks::handle_failure`] treats it as +/// fatal, the same semantics they had when spawned directly. +async fn run_ntx( + config: MonitorConfig, + increment_tx: watch::Sender, + tracking_tx: watch::Sender, +) { + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_secs(1)) + .with_max_delay(Duration::from_secs(30)) + .with_factor(2.0) + .with_jitter() + .without_max_times(); + + let (increment_svc, tracking_svc) = (|| async { bootstrap_ntx(&config).await }) + .retry(backoff) + .notify(|err: &anyhow::Error, sleep: Duration| { + warn!( + target: COMPONENT, + err = ?err, + sleep_ms = sleep.as_millis() as u64, + "NTX bootstrap failed; retrying after backoff", + ); + let msg = format!("deploying monitor accounts failed: {err:#}"); + increment_tx.send_replace(ServiceStatus::unhealthy( + IncrementService::NAME, + &msg, + ServiceDetails::NtxIncrement(IncrementDetails::default()), + )); + tracking_tx.send_replace(ServiceStatus::unhealthy( + CounterTrackingService::NAME, + &msg, + ServiceDetails::NtxTracking(CounterTrackingDetails::default()), + )); + }) + .await + .expect("unbounded retry only resolves on success"); + + // Run the services on their own tasks (a shared task would serialize the increment service's + // local proving with the tracking polls). The first one to finish ends this supervised task; + // the JoinSet aborts the other on drop. + let mut services = tokio::task::JoinSet::new(); + services.spawn(increment_svc.run(increment_tx)); + services.spawn(tracking_svc.run(tracking_tx)); + services.join_next().await; +} + +/// One bootstrap attempt: create and deploy fresh accounts, then build both services. +/// +/// Creates a fresh wallet/counter pair in memory, deploys the counter to the network, and hands +/// the same counter account to both services via a [`watch::channel`]. The increment service +/// publishes new counters on the channel when it regenerates accounts after persistent failures; +/// the tracking service observes the channel to switch over. +async fn bootstrap_ntx( + config: &MonitorConfig, +) -> Result<(IncrementService, CounterTrackingService)> { + let (wallet_account, secret_key, counter_account) = + create_and_deploy_accounts(&config.rpc_url).await?; + + let (counter_tx, counter_rx) = watch::channel(counter_account.clone()); + + let expected_counter_value = Arc::new(AtomicU64::new(0)); + let latency_state = Arc::new(Mutex::new(LatencyState::default())); + + let increment_svc = IncrementService::new( + config.clone(), + wallet_account, + secret_key, + counter_account, + counter_tx, + Arc::clone(&expected_counter_value), + latency_state.clone(), + ) + .await?; + let tracking_svc = CounterTrackingService::new( + config.clone(), + counter_rx, + Arc::clone(&expected_counter_value), + latency_state, + ) + .await?; + + Ok((increment_svc, tracking_svc)) +} diff --git a/bin/network-monitor/src/remote_prover.rs b/bin/network-monitor/src/remote_prover.rs index 9599e0393..19bf14630 100644 --- a/bin/network-monitor/src/remote_prover.rs +++ b/bin/network-monitor/src/remote_prover.rs @@ -3,9 +3,11 @@ //! A prover is monitored by up to two tasks: //! - [`ProverStatusService`] (impl [`Service`]): polls the proxy status endpoint on the status //! cadence and publishes the public [`ServiceStatus`] by merging in the latest probe outcome. -//! - [`run_prover_test`] (spawned lazily by the status service): runs proof-test probes on the -//! longer test cadence and publishes a private [`ProbeSnapshot`]. Only spawned the first time the -//! status service observes the prover reporting [`ProofType::Transaction`]. +//! - [`run_prover_test`] (spawned by the status service): acquires the proof-test payload from the +//! RPC, retrying until it succeeds, then runs proof-test probes on the longer test cadence and +//! publishes a private [`ProbeSnapshot`]. First spawned when the status service observes the +//! prover reporting [`ProofType::Transaction`]; respawned if it ever terminates, and its +//! snapshot is reported as stale once it stops updating. use std::time::{Duration, Instant}; @@ -14,9 +16,10 @@ use miden_node_proto::generated as proto; use miden_protocol::utils::serde::Serializable; use serde::{Deserialize, Serialize}; use tokio::sync::watch; +use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use tonic::Request; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, warn}; use url::Url; use crate::COMPONENT; @@ -39,16 +42,8 @@ pub enum ProofType { Transaction, Block, Batch, -} - -impl From for proto::remote_prover::ProofType { - fn from(value: ProofType) -> Self { - match value { - ProofType::Transaction => proto::remote_prover::ProofType::Transaction, - ProofType::Block => proto::remote_prover::ProofType::Block, - ProofType::Batch => proto::remote_prover::ProofType::Batch, - } - } + /// The prover reported a proof type this monitor version does not know about. + Unknown, } impl From for ProofType { @@ -89,84 +84,121 @@ pub struct ProbeSnapshot { // PROVER STATUS SERVICE // ================================================================================================ -/// Parameters captured at construction time for spawning the probe task lazily, the first time the +/// Parameters captured at construction time for spawning (and respawning) the probe task once the /// status service observes the prover reporting [`ProofType::Transaction`]. struct ProbeSpawner { client: RemoteProverClient, - payload: proto::remote_prover::ProofRequest, + rpc_url: Url, interval: Duration, probe_tx: watch::Sender, name: String, } +impl ProbeSpawner { + /// Spawns a probe task and returns its handle. + fn spawn(&self) -> JoinHandle<()> { + tokio::spawn(run_prover_test( + self.client.clone(), + self.rpc_url.clone(), + self.interval, + self.probe_tx.clone(), + self.name.clone(), + )) + } +} + /// Polls the remote prover's proxy status endpoint and publishes the combined [`ServiceStatus`] -/// (status + latest probe outcome). Spawns the probe task the first time the prover reports -/// Transaction type. +/// (status + latest probe outcome). Spawns the probe task once the prover reports Transaction type +/// and keeps it alive from then on. pub struct ProverStatusService { name: String, url: String, client: RemoteProverProxyStatusClient, interval: Duration, + request_timeout: Duration, last_status: Option, last_status_err: Option, probe_rx: watch::Receiver, - probe_spawner: Option, + probe_spawner: ProbeSpawner, + probe_handle: Option>, + /// When the most recent [`ProbeSnapshot`] change was observed; used to flag stale probe data. + last_probe_change: Option, } impl ProverStatusService { - #[expect(clippy::too_many_arguments)] pub fn new( name: String, prover_url: Url, + rpc_url: Url, interval: Duration, request_timeout: Duration, probe_interval: Duration, - probe_tx: watch::Sender, - probe_rx: watch::Receiver, test_client: RemoteProverClient, - payload: Option, ) -> Self { let url = prover_url.to_string(); let client = build_tls_client::(prover_url, request_timeout); - // Without a probe payload (e.g. RPC was unreachable at startup) the prover status is still - // polled, but no proof-test probe is armed. - let probe_spawner = payload.map(|payload| ProbeSpawner { + let (probe_tx, probe_rx) = watch::channel(ProbeSnapshot::default()); + let probe_spawner = ProbeSpawner { client: test_client, - payload, + rpc_url, interval: probe_interval, probe_tx, name: name.clone(), - }); + }; Self { name, url, client, interval, + request_timeout, last_status: None, last_status_err: None, probe_rx, probe_spawner, + probe_handle: None, + last_probe_change: None, } } - /// Spawns the probe task if the prover has just been observed to support Transaction proofs and - /// we haven't spawned it yet. No-op in all other cases. - fn maybe_spawn_probe(&mut self) { + /// Keeps the probe task alive once the prover has been observed to support Transaction proofs. + /// + /// Spawns the task on the first Transaction-type observation and respawns it if it ever + /// terminates. The task only ends by panicking (its snapshot channel outlives it), so a + /// finished handle is surfaced as an unhealthy outcome before respawning. + fn ensure_probe_running(&mut self) { let Some(status) = &self.last_status else { return }; if !matches!(status.supported_proof_type, ProofType::Transaction) { return; } - let Some(spawner) = self.probe_spawner.take() else { - return; - }; - debug!(target: COMPONENT, prover = %self.name, "spawning probe task"); - tokio::spawn(run_prover_test( - spawner.client, - spawner.payload, - spawner.interval, - spawner.probe_tx, - spawner.name, - )); + match &self.probe_handle { + None => { + debug!(target: COMPONENT, prover = %self.name, "spawning probe task"); + self.probe_handle = Some(self.probe_spawner.spawn()); + }, + Some(handle) if handle.is_finished() => { + warn!( + target: COMPONENT, + prover = %self.name, + "probe task terminated unexpectedly; respawning" + ); + self.probe_spawner.probe_tx.send_modify(|snapshot| { + snapshot.failure_count += 1; + snapshot.latest = Some(ProverTestOutcome { + details: ProverTestDetails { + test_duration_ms: 0, + proof_size_bytes: 0, + success_count: snapshot.success_count, + failure_count: snapshot.failure_count, + proof_type: ProofType::Transaction, + }, + status: Status::Unhealthy, + error: Some("probe task terminated unexpectedly; respawning".to_string()), + }); + }); + self.probe_handle = Some(self.probe_spawner.spawn()); + }, + Some(_) => {}, + } } /// Classifies the current status + probe state into a [`ServiceStatus`]. @@ -178,17 +210,23 @@ impl ProverStatusService { return status; }; + let test_outcome = classify_probe_outcome( + probe.latest.clone(), + self.last_probe_change.map(|changed| changed.elapsed()), + probe_staleness_window(self.probe_spawner.interval, self.request_timeout), + ); + let details = ServiceDetails::RemoteProverStatus(RemoteProverDetails { status: status_details.clone(), - test: probe.latest.clone(), + test: test_outcome.clone(), }); - // Most recent status poll failed — report unhealthy but keep last known status details. + // Most recent status poll failed; report unhealthy but keep last known status details. if let Some(err) = &self.last_status_err { return ServiceStatus::unhealthy(&self.name, err.clone(), details); } - if let Some(outcome) = &probe.latest { + if let Some(outcome) = &test_outcome { if outcome.status == Status::Unhealthy { let msg = outcome.error.clone().unwrap_or_else(|| "prover test failed".to_string()); return ServiceStatus::unhealthy(&self.name, msg, details); @@ -252,18 +290,63 @@ impl Service for ProverStatusService { self.last_status_err = Some(e.to_string()); }, } - self.maybe_spawn_probe(); - let probe = self.probe_rx.borrow().clone(); + self.ensure_probe_running(); + if self.probe_rx.has_changed().unwrap_or(false) { + self.last_probe_change = Some(Instant::now()); + } + let probe = self.probe_rx.borrow_and_update().clone(); self.build_status(&probe) } } +// PROBE OUTCOME CLASSIFICATION +// ================================================================================================ + +/// Window after which a probe outcome with no fresh updates is considered stale. +/// +/// Two full probe intervals plus the request timeout: a healthy probe task publishes at least +/// once per interval, and a single in-flight `prove()` call cannot outlive the request timeout. +fn probe_staleness_window(probe_interval: Duration, request_timeout: Duration) -> Duration { + probe_interval * 2 + request_timeout +} + +/// Re-classifies a probe outcome as stale when no snapshot update has been observed within the +/// staleness window. +/// +/// Staleness is an observability gap, not a prover failure, so the outcome is degraded to +/// [`Status::Unknown`] (which does not flip the prover card to unhealthy) instead of +/// [`Status::Unhealthy`]. +fn classify_probe_outcome( + outcome: Option, + elapsed_since_change: Option, + staleness_window: Duration, +) -> Option { + let outcome = outcome?; + match elapsed_since_change { + Some(elapsed) if elapsed > staleness_window => Some(ProverTestOutcome { + status: Status::Unknown, + error: Some(format!("stale: no probe update for {}s", elapsed.as_secs())), + details: outcome.details, + }), + _ => Some(outcome), + } +} + // PROBE TASK // ================================================================================================ -/// Runs proof-test probes on the configured interval. The task is spawned by -/// [`ProverStatusService::maybe_spawn_probe`] only after the prover has been observed to support -/// Transaction proofs. +/// Delay between payload-acquisition attempts. Each attempt is already bounded internally by the +/// genesis-discovery backoff, so this only paces consecutive failed attempts. +const PAYLOAD_RETRY_DELAY: Duration = Duration::from_secs(30); + +/// Runs proof-test probes on the configured interval. The task is spawned (and respawned) by +/// [`ProverStatusService::ensure_probe_running`] only after the prover has been observed to +/// support Transaction proofs. +/// +/// The probe payload is acquired from the RPC first, retrying until it succeeds, so an RPC that +/// is unreachable at spawn time delays probing instead of permanently disarming it. Acquisition +/// failures are published as [`Status::Unknown`] outcomes: they are an RPC problem, not a prover +/// failure. #[instrument( parent = None, target = COMPONENT, @@ -274,14 +357,47 @@ impl Service for ProverStatusService { )] async fn run_prover_test( mut client: RemoteProverClient, - payload: proto::remote_prover::ProofRequest, + rpc_url: Url, interval: Duration, probe_tx: watch::Sender, name: String, ) { + let payload = loop { + if probe_tx.is_closed() { + debug!(target: COMPONENT, prover = %name, "probe channel closed, exiting probe task"); + return; + } + match generate_prover_test_payload(&rpc_url).await { + Ok(payload) => break payload, + Err(e) => { + warn!( + target: COMPONENT, + prover = %name, + error = ?e, + "failed to build remote-prover probe payload; retrying" + ); + probe_tx.send_modify(|snapshot| { + snapshot.latest = Some(ProverTestOutcome { + details: ProverTestDetails { + test_duration_ms: 0, + proof_size_bytes: 0, + success_count: snapshot.success_count, + failure_count: snapshot.failure_count, + proof_type: ProofType::Transaction, + }, + status: Status::Unknown, + error: Some(format!("building probe payload failed: {e:#}")), + }); + }); + tokio::time::sleep(PAYLOAD_RETRY_DELAY).await; + }, + } + }; + let mut timer = tokio::time::interval(interval); timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut state = ProbeSnapshot::default(); + // Start from the last published snapshot so a respawned task keeps the running counters. + let mut state = probe_tx.borrow().clone(); loop { timer.tick().await; @@ -374,7 +490,7 @@ fn tonic_status_to_json(status: &tonic::Status) -> String { ret(level = "debug"), err )] -pub(crate) async fn generate_prover_test_payload( +async fn generate_prover_test_payload( rpc_url: &Url, ) -> anyhow::Result { let tx_inputs = crate::deploy::build_probe_transaction_inputs(rpc_url).await?; @@ -383,3 +499,84 @@ pub(crate) async fn generate_prover_test_payload( payload: tx_inputs.to_bytes(), }) } + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + fn outcome(status: Status, error: Option<&str>) -> ProverTestOutcome { + ProverTestOutcome { + details: ProverTestDetails { + test_duration_ms: 50, + proof_size_bytes: 1024, + success_count: 3, + failure_count: 1, + proof_type: ProofType::Transaction, + }, + status, + error: error.map(str::to_string), + } + } + + const WINDOW: Duration = Duration::from_secs(100); + + #[test] + fn fresh_outcome_passes_through() { + let classified = classify_probe_outcome( + Some(outcome(Status::Healthy, None)), + Some(Duration::from_secs(10)), + WINDOW, + ) + .unwrap(); + assert_eq!(classified.status, Status::Healthy); + assert!(classified.error.is_none()); + } + + #[test] + fn stale_outcome_degrades_to_unknown() { + let classified = classify_probe_outcome( + Some(outcome(Status::Healthy, None)), + Some(Duration::from_secs(101)), + WINDOW, + ) + .unwrap(); + assert_eq!(classified.status, Status::Unknown); + let err = classified.error.expect("stale outcome should carry an error note"); + assert!(err.contains("stale"), "got: {err}"); + // Numeric details from the last real probe are preserved. + assert_eq!(classified.details.success_count, 3); + } + + #[test] + fn stale_unhealthy_outcome_degrades_to_unknown() { + // A stale failure must not keep the card unhealthy forever; staleness wins. + let classified = classify_probe_outcome( + Some(outcome(Status::Unhealthy, Some("prove failed"))), + Some(Duration::from_secs(500)), + WINDOW, + ) + .unwrap(); + assert_eq!(classified.status, Status::Unknown); + } + + #[test] + fn missing_outcome_stays_missing() { + assert!(classify_probe_outcome(None, Some(Duration::from_secs(500)), WINDOW).is_none()); + } + + #[test] + fn outcome_without_observed_change_passes_through() { + let classified = + classify_probe_outcome(Some(outcome(Status::Healthy, None)), None, WINDOW).unwrap(); + assert_eq!(classified.status, Status::Healthy); + } + + #[test] + fn staleness_window_scales_with_interval_and_timeout() { + let window = probe_staleness_window(Duration::from_secs(120), Duration::from_secs(10)); + assert_eq!(window, Duration::from_secs(250)); + } +} diff --git a/bin/network-monitor/src/service_status.rs b/bin/network-monitor/src/service_status.rs index cde1cead8..c184b519c 100644 --- a/bin/network-monitor/src/service_status.rs +++ b/bin/network-monitor/src/service_status.rs @@ -9,7 +9,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use miden_node_proto::generated as proto; use miden_node_proto::generated::rpc::{BlockProducerStatus, RpcStatus}; use serde::{Deserialize, Serialize}; +use tracing::warn; +use crate::COMPONENT; use crate::faucet::FaucetTestDetails; use crate::remote_prover::{ProofType, ProverTestDetails}; @@ -254,8 +256,8 @@ pub struct BlockProducerStatusDetails { pub status: Status, /// The block producer's current view of the chain tip height. pub chain_tip: u32, - /// Mempool statistics for this block producer. - pub mempool: MempoolStatusDetails, + /// Mempool statistics for this block producer; `None` when the node did not report them. + pub mempool: Option, } /// Details about the block producer's mempool. @@ -306,28 +308,39 @@ pub struct NetworkStatus { impl From for BlockProducerStatusDetails { fn from(value: BlockProducerStatus) -> Self { - // We assume all supported nodes expose mempool statistics. - let mempool_stats = value - .mempool_stats - .expect("block producer status must include mempool statistics"); + // Mempool statistics are a message field, hence optional on the wire; a node version that + // omits them must not bring the checker down. + let mempool = value.mempool_stats.map(|stats| MempoolStatusDetails { + unbatched_transactions: stats.unbatched_transactions, + proposed_batches: stats.proposed_batches, + proven_batches: stats.proven_batches, + }); Self { version: value.version, status: value.status.into(), chain_tip: value.chain_tip, - mempool: MempoolStatusDetails { - unbatched_transactions: mempool_stats.unbatched_transactions, - proposed_batches: mempool_stats.proposed_batches, - proven_batches: mempool_stats.proven_batches, - }, + mempool, } } } impl From for WorkerStatusDetails { fn from(value: proto::remote_prover::ProxyWorkerStatus) -> Self { - let status = - proto::remote_prover::WorkerHealthStatus::try_from(value.status).unwrap().into(); + // An out-of-range discriminant (e.g. from a newer prover version) degrades to Unknown + // instead of panicking the checker task. + let status = proto::remote_prover::WorkerHealthStatus::try_from(value.status).map_or_else( + |_| { + warn!( + target: COMPONENT, + raw = value.status, + worker = %value.name, + "unknown worker health status discriminant" + ); + Status::Unknown + }, + Status::from, + ); Self { name: value.name, @@ -339,9 +352,20 @@ impl From for WorkerStatusDetails { impl RemoteProverStatusDetails { pub fn from_proxy_status(status: proto::remote_prover::ProxyStatus, url: String) -> Self { + // An out-of-range discriminant (e.g. from a newer prover version) degrades to Unknown + // instead of panicking the checker task. let proof_type = proto::remote_prover::ProofType::try_from(status.supported_proof_type) - .unwrap() - .into(); + .map_or_else( + |_| { + warn!( + target: COMPONENT, + raw = status.supported_proof_type, + "unknown supported proof type discriminant" + ); + ProofType::Unknown + }, + ProofType::from, + ); let workers: Vec = status.workers.into_iter().map(WorkerStatusDetails::from).collect(); @@ -381,3 +405,54 @@ pub fn current_unix_timestamp_secs() -> u64 { .unwrap_or_else(|_| Duration::from_secs(0)) .as_secs() } + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn block_producer_status_without_mempool_stats_does_not_panic() { + let proto_status = BlockProducerStatus { + version: "1.0.0".to_string(), + status: "HEALTHY".to_string(), + chain_tip: 7, + mempool_stats: None, + }; + let details = BlockProducerStatusDetails::from(proto_status); + assert!(details.mempool.is_none()); + assert_eq!(details.status, Status::Healthy); + assert_eq!(details.chain_tip, 7); + } + + #[test] + fn worker_status_with_unknown_discriminant_degrades_to_unknown() { + let proto_status = proto::remote_prover::ProxyWorkerStatus { + name: "worker-1".to_string(), + version: "1.0".to_string(), + status: 99, + }; + let details = WorkerStatusDetails::from(proto_status); + assert_eq!(details.status, Status::Unknown); + assert_eq!(details.name, "worker-1"); + } + + #[test] + fn proxy_status_with_unknown_proof_type_degrades_to_unknown() { + let proto_status = proto::remote_prover::ProxyStatus { + version: "1.0".to_string(), + supported_proof_type: 99, + workers: vec![proto::remote_prover::ProxyWorkerStatus { + name: "worker-1".to_string(), + version: "1.0".to_string(), + status: proto::remote_prover::WorkerHealthStatus::Healthy.into(), + }], + }; + let details = RemoteProverStatusDetails::from_proxy_status(proto_status, "url".to_string()); + assert!(matches!(details.supported_proof_type, ProofType::Unknown)); + assert_eq!(details.workers.len(), 1); + assert_eq!(details.workers[0].status, Status::Healthy); + } +} diff --git a/bin/network-monitor/src/view/cards/rpc.rs b/bin/network-monitor/src/view/cards/rpc.rs index 0ba6f85e9..84772d9eb 100644 --- a/bin/network-monitor/src/view/cards/rpc.rs +++ b/bin/network-monitor/src/view/cards/rpc.rs @@ -32,7 +32,6 @@ pub(in crate::view) fn render_rpc_status(details: &RpcStatusDetails) -> Markup { } } @if let Some(block_producer) = &details.block_producer_status { - @let mempool = &block_producer.mempool; div class="nested-status" { div class="detail-item" { strong { "Block Producer" } } (metric_row("Version:", &block_producer.version)) @@ -40,9 +39,15 @@ pub(in crate::view) fn render_rpc_status(details: &RpcStatusDetails) -> Markup { (metric_row("Chain Tip:", &block_producer.chain_tip.to_string())) div class="nested-status mempool-stats" { strong { "Mempool stats:" } - (metric_row("Unbatched TXs:", &mempool.unbatched_transactions.to_string())) - (metric_row("Proposed Batches:", &mempool.proposed_batches.to_string())) - (metric_row("Proven Batches:", &mempool.proven_batches.to_string())) + @if let Some(mempool) = &block_producer.mempool { + (metric_row("Unbatched TXs:", &mempool.unbatched_transactions.to_string())) + (metric_row("Proposed Batches:", &mempool.proposed_batches.to_string())) + (metric_row("Proven Batches:", &mempool.proven_batches.to_string())) + } @else { + (metric_row("Unbatched TXs:", "-")) + (metric_row("Proposed Batches:", "-")) + (metric_row("Proven Batches:", "-")) + } } } } diff --git a/bin/network-monitor/src/view/helpers.rs b/bin/network-monitor/src/view/helpers.rs index 204807bf8..4829d691c 100644 --- a/bin/network-monitor/src/view/helpers.rs +++ b/bin/network-monitor/src/view/helpers.rs @@ -90,6 +90,7 @@ pub(super) fn proof_type_label(proof_type: &ProofType) -> &'static str { ProofType::Transaction => "Transaction", ProofType::Block => "Block", ProofType::Batch => "Batch", + ProofType::Unknown => "Unknown", } } diff --git a/bin/network-monitor/src/view/mod.rs b/bin/network-monitor/src/view/mod.rs index df381c86d..3e58462dd 100644 --- a/bin/network-monitor/src/view/mod.rs +++ b/bin/network-monitor/src/view/mod.rs @@ -313,11 +313,11 @@ mod tests { version: "1.2.3".to_string(), status: Status::Healthy, chain_tip: 42, - mempool: MempoolStatusDetails { + mempool: Some(MempoolStatusDetails { unbatched_transactions: 1, proposed_batches: 2, proven_batches: 3, - }, + }), }), } }