diff --git a/bin/ntx-builder/src/actor/execute.rs b/bin/ntx-builder/src/actor/execute.rs index 4fced98c8..0dfb3c2ac 100644 --- a/bin/ntx-builder/src/actor/execute.rs +++ b/bin/ntx-builder/src/actor/execute.rs @@ -11,6 +11,7 @@ use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; use miden_protocol::account::{ Account, + AccountDelta, AccountId, AccountStorageHeader, PartialAccount, @@ -120,10 +121,18 @@ fn log_transient_retry(operation: &'static str, err: &E, s } /// The result of a successful transaction execution. -/// -/// Contains the transaction ID, any notes that failed during filtering, and note scripts fetched -/// from the remote RPC service that should be persisted to the local DB cache. -pub type NtxExecutionResult = (TransactionId, Vec, Vec<(Word, NoteScript)>); +pub struct NtxExecutionResult { + /// ID of the submitted transaction. + pub tx_id: TransactionId, + /// The account delta the transaction produced, applied to the actor's in-memory account once + /// the transaction lands. + pub account_delta: AccountDelta, + /// Notes that failed during consumability filtering. + pub failed_notes: Vec, + /// Note scripts fetched from the remote RPC service that should be persisted to the local DB + /// cache. + pub fetched_scripts: Vec<(Word, NoteScript)>, +} // NETWORK TRANSACTION CONTEXT // ================================================================================================ @@ -230,9 +239,9 @@ impl NtxContext { /// /// # Returns /// - /// On success, returns an [`NtxExecutionResult`] containing the transaction ID, any notes - /// that failed during filtering, and note scripts fetched from the remote RPC service that - /// should be persisted to the local DB cache. + /// On success, returns an [`NtxExecutionResult`] containing the transaction ID, the account + /// delta the transaction produced, any notes that failed during filtering, and note scripts + /// fetched from the remote RPC service that should be persisted to the local DB cache. /// /// # Errors /// @@ -297,6 +306,10 @@ impl NtxContext { .await .unwrap_or_else(|err| std::panic::resume_unwind(err.into_panic()))?; + // Capture the account delta before the executed tx is consumed; the actor applies + // it to its in-memory account once this transaction lands in a committed block. + let account_delta = executed_tx.account_delta().clone(); + // Prove transaction. let tx_inputs: TransactionInputs = executed_tx.into(); let proven_tx = Box::pin(self.prove(&tx_inputs)).await?; @@ -304,7 +317,12 @@ impl NtxContext { // Submit transaction through the RPC service. self.submit(&proven_tx, &tx_inputs).await?; - Ok((proven_tx.id(), failed_notes, scripts_to_cache)) + Ok(NtxExecutionResult { + tx_id: proven_tx.id(), + account_delta, + failed_notes, + fetched_scripts: scripts_to_cache, + }) }) .in_current_span() .await diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index ea929cb53..6c3eebfe9 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -13,7 +13,7 @@ use futures::FutureExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; -use miden_protocol::account::AccountId; +use miden_protocol::account::{Account, AccountDelta, AccountId}; use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; use miden_protocol::transaction::{TransactionId, TransactionScript}; @@ -194,14 +194,19 @@ enum ActorMode { NotesAvailable, /// A network transaction has been submitted; the actor waits for it to land in a committed /// block. Landing is detected from the local DB: `apply_committed_block` records the - /// transaction id that updated each network account as `accounts.last_tx_id`, so the actor only - /// has to check whether its own submitted id is the account's latest. + /// transaction id that updated each network account as `accounts.last_tx_id`, so the actor + /// checks whether its own submitted id is the account's latest. On landing it applies + /// `pending_delta` to its in-memory account, avoiding a re-read of the full account from the + /// database. WaitForBlock { /// Id of the network transaction the actor submitted. submitted_tx_id: TransactionId, /// Chain tip block number at submission. With [`ActorConfig::tx_expiration_delta`] this /// bounds how long the actor waits before retrying. submitted_at: BlockNumber, + /// The account delta the submitted transaction produced, applied to the in-memory account + /// once the transaction lands. + pending_delta: AccountDelta, }, } @@ -225,7 +230,8 @@ enum ActorMode { /// /// ## Lifecycle /// -/// 1. **Initialization**: Waits for committed account state, then checks DB for available notes. +/// 1. **Initialization**: Loads the committed account state (guaranteed to exist, since the +/// coordinator only spawns actors for committed accounts), then checks DB for available notes. /// 2. **Event Loop**: Re-evaluates database state on notification and executes transactions. /// 3. **Transaction Processing**: Selects, executes, proves, and submits transactions through RPC. /// 4. **State Updates**: Committed-chain updates are persisted to DB before actors are @@ -275,16 +281,21 @@ impl AccountActor { /// /// The return value signals the shutdown category to the coordinator: /// - /// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time). + /// - `Ok(())`: intentional shutdown (idle timeout). /// - `Err(_)`: crash (database error, semaphore failure, or any other bug). pub async fn run(self, semaphore: Arc) -> anyhow::Result<()> { let account_id = self.account_id; - // Wait for the account to be committed to the DB. For newly created accounts, the creation - // transaction must be committed before we start processing notes. - if !self.wait_for_committed_account(account_id).await? { - return Ok(()); - } + // Load the account once and keep it in memory for the actor's lifetime, advancing it from + // the delta of each transaction the actor itself lands. The coordinator only spawns actors + // for accounts whose creation has been committed, so the account must exist. + let mut account = self + .state + .db + .get_account(account_id) + .await + .context("failed to load committed account")? + .context("no committed state for the account; the coordinator must only spawn actors for committed accounts")?; // Determine initial mode by checking the DB for available notes. let block_num = self.state.chain.chain_tip_block_number(); @@ -316,16 +327,17 @@ impl AccountActor { }; tokio::select! { - // A committed block touched this account (or the coordinator woke everyone). + // A committed block touched this account (or the coordinator woke everyone): the + // submission may have landed (advancing the in-memory account by its own delta), + // the submission may have expired, or new notes may be available. _ = self.notify.notified() => { - mode = self.reevaluate_mode(account_id, mode).await?; + mode = self.reevaluate_mode(&mut account, mode).await?; }, // Execute a transaction once a permit is available. permit = tx_permit_acquisition => { let _permit = permit.context("semaphore closed")?; let chain_state = self.state.chain.get_cloned(); - let tx_candidate = - self.select_candidate_from_db(account_id, chain_state).await?; + let tx_candidate = self.select_candidate(&account, chain_state).await?; mode = match tx_candidate { Some(candidate) => self.execute_transactions(account_id, candidate).await, None => ActorMode::NoViableNotes, @@ -340,74 +352,103 @@ impl AccountActor { } } - /// Decides the actor's next mode after a coordinator notification. + /// Decides the actor's next mode after a coordinator notification, advancing the in-memory + /// account when the actor's own transaction lands. /// /// - In `NoViableNotes`/`NotesAvailable`, a wake means the DB may now have new work; advance to /// `NotesAvailable` and let the next `select_candidate` decide whether a real candidate /// exists. - /// - In `WaitForBlock`, query the latest transaction recorded against the account. If it equals - /// the actor's submitted transaction id, the tx landed; return to `NotesAvailable`. Else, if - /// `tx_expiration_delta` blocks have passed since submission, give up waiting and resume - /// candidate selection; otherwise stay in `WaitForBlock`. + /// - In `WaitForBlock`, query the latest transaction recorded against the account: + /// - If it equals the actor's submitted id, the transaction landed: apply its `pending_delta` + /// to the in-memory account and resume selection. + /// - Else if `tx_expiration_delta` blocks have passed since submission, the submission expired: + /// reload the account from the DB (in case a different transaction changed it while we + /// waited) and resume selection. + /// - Otherwise keep waiting. async fn reevaluate_mode( &self, - account_id: AccountId, + account: &mut Account, mode: ActorMode, ) -> anyhow::Result { - match mode { - ActorMode::WaitForBlock { submitted_tx_id, submitted_at } => { - let landed = self - .state - .db - .account_last_tx(account_id) - .await - .context("failed to check submitted tx landing")? - == Some(submitted_tx_id); - if landed { - return Ok(ActorMode::NotesAvailable); - } + let ActorMode::WaitForBlock { + submitted_tx_id, + submitted_at, + pending_delta, + } = mode + else { + return Ok(ActorMode::NotesAvailable); + }; - let chain_tip = self.state.chain.chain_tip_block_number(); - let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default(); - if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta.get()) { - tracing::info!( - %account_id, - %submitted_at, - current_tip = %chain_tip, - delta = self.config.tx_expiration_delta, - "submitted transaction expired", - ); - return Ok(ActorMode::NotesAvailable); - } + let landed = self + .state + .db + .account_last_tx(self.account_id) + .await + .context("failed to check submitted tx landing")? + == Some(submitted_tx_id); + if landed { + // The landed transaction is the one we executed, so the committed state is exactly our + // in-memory account plus the delta it produced. + account + .apply_delta(&pending_delta) + .context("failed to apply landed transaction delta to in-memory account")?; + tracing::info!( + account_id = %self.account_id, + tx_id = %submitted_tx_id, + "submitted transaction landed; advanced in-memory account by its delta", + ); + return Ok(ActorMode::NotesAvailable); + } - Ok(ActorMode::WaitForBlock { submitted_tx_id, submitted_at }) - }, - _ => Ok(ActorMode::NotesAvailable), + let chain_tip = self.state.chain.chain_tip_block_number(); + let elapsed = chain_tip.checked_sub(submitted_at.as_u32()).unwrap_or_default(); + if elapsed.as_u32() >= u32::from(self.config.tx_expiration_delta.get()) { + tracing::info!( + account_id = %self.account_id, + %submitted_at, + current_tip = %chain_tip, + delta = self.config.tx_expiration_delta, + "submitted transaction expired", + ); + // The submission did not land. Reload the authoritative account in case a different + // transaction changed it while we waited, then resume selection. + if let Some(latest) = self + .state + .db + .get_account(self.account_id) + .await + .context("failed to reload account after submission expiry")? + { + *account = latest; + } + return Ok(ActorMode::NotesAvailable); } + + Ok(ActorMode::WaitForBlock { + submitted_tx_id, + submitted_at, + pending_delta, + }) } - /// Selects a transaction candidate by querying the DB. - async fn select_candidate_from_db( + /// Selects a transaction candidate for the in-memory account by querying its available notes. + async fn select_candidate( &self, - account_id: AccountId, + account: &Account, chain_state: ChainState, ) -> anyhow::Result> { + let account_id = self.account_id; let block_num = chain_state.chain_tip_header.block_num(); let max_notes = self.config.max_notes_per_tx.get(); - let (latest_account, notes) = self + let notes = self .state .db - .select_candidate(account_id, block_num, self.config.max_note_attempts) + .available_notes(account_id, block_num, self.config.max_note_attempts) .await - .context("failed to query DB for transaction candidate")?; + .context("failed to query DB for available notes")?; - let Some(account) = latest_account else { - tracing::info!(account_id = %account_id, "Account no longer exists in DB"); - return Ok(None); - }; - - let partitioned_notes = partition_by_allowlist(&account, notes) + let partitioned_notes = partition_by_allowlist(account, notes) .context("failed to read network account note allowlist")?; if !partitioned_notes.rejected.is_empty() { @@ -434,57 +475,13 @@ impl AccountActor { let (chain_tip_header, chain_mmr) = chain_state.into_parts(); Ok(Some(TransactionCandidate { - account, + account: account.clone(), notes, chain_tip_header, chain_mmr, })) } - /// Waits until a committed account state exists in the DB. - /// - /// For accounts that are being created by an inflight transaction, this will idle - /// until the transaction is committed. Returns `true` when the account is ready, or - /// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case - /// the coordinator will respawn a new actor when a later committed block targets the - /// account again. - async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result { - // Check if the account is already committed. - if self - .state - .db - .has_committed_account(account_id) - .await - .context("failed to check for committed account")? - { - return Ok(true); - } - - loop { - tokio::select! { - _ = self.notify.notified() => { - if self - .state - .db - .has_committed_account(account_id) - .await - .context("failed to check for committed account")? - { - tracing::info!(account.id=%account_id, "Account committed, starting normal operation"); - return Ok(true); - } - } - _ = tokio::time::sleep(self.config.idle_timeout) => { - tracing::info!( - %account_id, - "Account actor deactivated while waiting for account commit", - ); - return Ok(false); - } - } - } - } - /// Execute a transaction candidate and mark notes as failed as required. /// /// Returns the new actor mode based on the execution result. @@ -525,14 +522,19 @@ impl AccountActor { let execution_result = context.execute_transaction(tx_candidate).await; match execution_result { - Ok((tx_id, failed, scripts_to_cache)) => { + Ok(execute::NtxExecutionResult { + tx_id, + account_delta, + failed_notes: failed, + fetched_scripts, + }) => { tracing::info!( %account_id, %tx_id, num_failed = failed.len(), "network transaction executed with some failed notes", ); - self.cache_note_scripts(scripts_to_cache).await; + self.cache_note_scripts(fetched_scripts).await; // A tx carries work only if at least one candidate note survived consumability // filtering; if every note failed there is nothing on-chain to wait for. @@ -549,6 +551,7 @@ impl AccountActor { ActorMode::WaitForBlock { submitted_tx_id: tx_id, submitted_at: block_num, + pending_delta: account_delta, } } }, @@ -650,7 +653,107 @@ fn log_failed_notes(failed: Vec) -> Vec<(Nullifier, NoteError)> { mod tests { use std::num::NonZeroU16; - use super::expiration_tx_script; + use miden_protocol::ONE; + use miden_protocol::account::{Account, AccountDelta, AccountStorageDelta, AccountVaultDelta}; + use tokio::sync::Notify; + + use super::*; + use crate::db::Db; + use crate::test_utils::{mock_account, mock_network_account_id, mock_transaction_id}; + + /// Builds a valid nonce-only [`AccountDelta`] for `account_id`. + fn nonce_bump_delta(account_id: AccountId) -> AccountDelta { + AccountDelta::new( + account_id, + AccountStorageDelta::default(), + AccountVaultDelta::default(), + ONE, + ) + .expect("a nonce-only delta is valid") + } + + /// Builds an actor wired to `db` for the given account, plus the in-memory account to drive. + fn test_actor(db: &Db, account: &Account) -> AccountActor { + let ctx = AccountActorContext::test(db); + AccountActor::new(account.id(), &ctx, Arc::new(Notify::new())) + } + + /// When the submitted transaction lands (its id is the account's latest committed tx), the + /// actor advances its in-memory account by exactly the delta the transaction produced. + #[tokio::test] + async fn landing_advances_in_memory_account_by_its_delta() { + let (db, _dir) = Db::test_setup().await; + let account = mock_account(mock_network_account_id()); + let account_id = account.id(); + let submitted = mock_transaction_id(7); + + // Seed the committed row so the landing check sees our submission as the latest tx. + db.upsert_account_for_test(account_id, account.clone(), submitted) + .await + .unwrap(); + + let delta = nonce_bump_delta(account_id); + let mut expected = account.clone(); + expected.apply_delta(&delta).unwrap(); + + let actor = test_actor(&db, &account); + let mut in_memory = account.clone(); + let mode = actor + .reevaluate_mode( + &mut in_memory, + ActorMode::WaitForBlock { + submitted_tx_id: submitted, + submitted_at: 0_u32.into(), + pending_delta: delta, + }, + ) + .await + .unwrap(); + + assert!(matches!(mode, ActorMode::NotesAvailable), "a landed tx must resume selection"); + assert_eq!( + in_memory.to_commitment(), + expected.to_commitment(), + "the in-memory account must be advanced by the landed tx's delta", + ); + } + + /// While the submission has neither landed nor expired, the actor keeps waiting and leaves its + /// in-memory account untouched. + #[tokio::test] + async fn pending_submission_keeps_waiting_without_touching_account() { + let (db, _dir) = Db::test_setup().await; + let account = mock_account(mock_network_account_id()); + + // Nothing seeded: `account_last_tx` returns `None`, so the submission has not landed. The + // test chain sits at genesis, well within `tx_expiration_delta`, so it has not expired. + let actor = test_actor(&db, &account); + let mut in_memory = account.clone(); + let submitted = mock_transaction_id(7); + let mode = actor + .reevaluate_mode( + &mut in_memory, + ActorMode::WaitForBlock { + submitted_tx_id: submitted, + submitted_at: 0_u32.into(), + pending_delta: nonce_bump_delta(account.id()), + }, + ) + .await + .unwrap(); + + match mode { + ActorMode::WaitForBlock { submitted_tx_id, .. } => { + assert_eq!(submitted_tx_id, submitted, "the actor must keep waiting on its own tx"); + }, + other => panic!("expected to stay in WaitForBlock, got {other:?}"), + } + assert_eq!( + in_memory.to_commitment(), + account.to_commitment(), + "a still-pending submission must not change the in-memory account", + ); + } /// The expiration script must compile for the full valid delta range, and the delta must be /// baked into the script (distinct deltas → distinct script roots), proving the on-chain diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index 192ce8454..c5f982fd5 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -138,7 +138,8 @@ impl NetworkTransactionBuilder { } } - // Phase 2: spawn an actor for every account with carry-over pending notes. + // Phase 2: spawn an actor for every account with carry-over pending notes. Accounts whose + // creation has not been committed yet have their spawn deferred by the coordinator. let pending_accounts = loop_db .accounts_with_pending_notes(self.config.max_note_attempts) .await @@ -148,7 +149,7 @@ impl NetworkTransactionBuilder { "spawning actors for accounts with carry-over pending notes", ); for account_id in pending_accounts { - self.coordinator.spawn_actor(account_id); + self.coordinator.spawn_actor_when_committed(account_id).await?; } // Phase 3: drive actors per committed block, plus serialize their DB writes. @@ -175,7 +176,7 @@ impl NetworkTransactionBuilder { let effects = self .apply_committed_block_with_effects(&loop_db, block, committed_tip) .await?; - self.coordinator.handle_committed_block(&effects); + self.coordinator.handle_committed_block(&effects).await?; }, SteadyStateAction::Request(request) => { let Some(request) = request else { diff --git a/bin/ntx-builder/src/committed_block.rs b/bin/ntx-builder/src/committed_block.rs index 9bd0481dc..1d9029bb1 100644 --- a/bin/ntx-builder/src/committed_block.rs +++ b/bin/ntx-builder/src/committed_block.rs @@ -5,6 +5,8 @@ use miden_protocol::note::Nullifier; use miden_protocol::transaction::{OutputNote, TransactionId}; use miden_standards::note::AccountTargetNetworkNote; +use crate::db::models::account_effect::NetworkAccountEffect; + /// Network-relevant state extracted from a committed [`SignedBlock`]. /// /// Produced once per committed block on the ntx-builder side. Downstream code (DB layer, @@ -75,4 +77,18 @@ impl CommittedBlockEffects { account_transactions, } } + + /// Returns the ids of the network accounts created by this block. + /// + /// The coordinator uses this to release actor spawns that were deferred until the account's + /// creation transaction committed. + pub fn created_network_accounts(&self) -> impl Iterator + '_ { + self.network_account_updates.iter().filter_map(|(account_id, details)| { + matches!( + NetworkAccountEffect::from_protocol(details), + Some(NetworkAccountEffect::Created(_)) + ) + .then_some(*account_id) + }) + } } diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index ae4273e76..bade0b8e9 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -1,7 +1,9 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use anyhow::Context; use miden_protocol::account::AccountId; +use miden_standards::note::AccountTargetNetworkNote; use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; @@ -62,6 +64,10 @@ impl ActorHandle { /// spawns missing actors for accounts that just received new network notes and wakes every /// active actor so it can re-evaluate its state from the DB. /// +/// Actors only operate on committed account state, so spawning is restricted to accounts whose +/// creation has been committed: a spawn requested for a not-yet-committed account is deferred +/// until the block carrying the account's creation arrives. +/// /// Notifications are coalesced through [`Notify`]: multiple wakes while an actor is busy /// collapse into one. Actors that crash repeatedly are deactivated after `max_account_crashes` /// failures. @@ -88,6 +94,12 @@ pub struct Coordinator { /// Maximum number of crashes an account actor is allowed before being deactivated. max_account_crashes: usize, + + /// Accounts targeted by network notes whose creation transaction has not been committed yet. + /// + /// Their actor spawn is deferred until a committed block carries the account's creation, at + /// which point [`Coordinator::handle_committed_block`] promotes them to a real actor. + pending_spawns: HashSet, } impl Coordinator { @@ -105,6 +117,7 @@ impl Coordinator { actor_context, crash_counts: HashMap::new(), max_account_crashes, + pending_spawns: HashSet::new(), } } @@ -146,23 +159,67 @@ impl Coordinator { tracing::info!(account.id = %account_id, "Created actor for account"); } - /// Reacts to a committed block: spawns actors for any newly-targeted network accounts and wakes - /// every active actor so it can re-evaluate its state. - pub fn handle_committed_block(&mut self, effects: &CommittedBlockEffects) { - let mut targeted: HashSet = HashSet::new(); - for note in &effects.network_notes { - targeted.insert(note.target_account_id()); + /// Spawns an actor for the given account if its committed state exists in the DB; otherwise + /// defers the spawn until the block carrying the account's creation arrives. + /// + /// Actors only operate on committed account state, so spawning earlier would only produce an + /// actor idling for the creation transaction to commit. + pub async fn spawn_actor_when_committed( + &mut self, + account_id: AccountId, + ) -> anyhow::Result<()> { + if self.actor_registry.contains_key(&account_id) { + return Ok(()); + } + + let committed = self + .actor_context + .state + .db + .account_exists(account_id) + .await + .context("failed to check for committed account state")?; + + if committed { + self.spawn_actor(account_id); + } else { + tracing::info!( + account.id = %account_id, + "deferring actor spawn until the account's creation is committed", + ); + self.pending_spawns.insert(account_id); } + Ok(()) + } - for account_id in &targeted { - if !self.actor_registry.contains_key(account_id) { - self.spawn_actor(*account_id); + /// Reacts to a committed block: spawns actors for any newly-targeted network accounts whose + /// committed state exists (deferring the rest until their creation commits), releases deferred + /// spawns for accounts created by this block, and wakes every active actor so it can + /// re-evaluate its state. + pub async fn handle_committed_block( + &mut self, + effects: &CommittedBlockEffects, + ) -> anyhow::Result<()> { + // Accounts created by this block release any spawn deferred on their creation. + for account_id in effects.created_network_accounts() { + if self.pending_spawns.remove(&account_id) { + self.spawn_actor(account_id); } } + let targeted: HashSet = effects + .network_notes + .iter() + .map(AccountTargetNetworkNote::target_account_id) + .collect(); + for account_id in targeted { + self.spawn_actor_when_committed(account_id).await?; + } + for handle in self.actor_registry.values() { handle.notify(); } + Ok(()) } /// Waits for the next actor to complete and handles the outcome. @@ -236,12 +293,47 @@ mod tests { coordinator.actor_registry.insert(account_id, ActorHandle::new(notify)); } + /// Seeds a committed row for `account_id` so the coordinator's spawn check sees the account. + async fn seed_committed_account(coordinator: &Coordinator, account_id: AccountId) { + let db = coordinator.actor_context.state.db.clone(); + db.upsert_account_for_test(account_id, mock_account(account_id), mock_transaction_id(0)) + .await + .unwrap(); + } + + #[tokio::test] + async fn handle_committed_block_spawns_for_committed_note_target() { + let (mut coordinator, _dir, _rx) = Coordinator::test().await; + + let target_id = mock_network_account_id(); + seed_committed_account(&coordinator, target_id).await; + + let note = mock_single_target_note(target_id, 10); + let effects = CommittedBlockEffects { + header: mock_block_header(1_u32.into()), + network_notes: vec![note], + nullifiers: vec![], + network_account_updates: vec![], + account_transactions: vec![], + }; + + coordinator.handle_committed_block(&effects).await.unwrap(); + + assert!( + coordinator.actor_registry.contains_key(&target_id), + "a committed account targeted by a note should get a fresh actor", + ); + } + #[tokio::test] - async fn handle_committed_block_spawns_for_unknown_note_target() { + async fn handle_committed_block_defers_spawn_until_account_creation_commits() { let (mut coordinator, _dir, _rx) = Coordinator::test().await; - let unknown_id = mock_network_account_id(); - let note = mock_single_target_note(unknown_id, 10); + let (account, details) = mock_network_account_update(); + let account_id = account.id(); + + // A note targets the account before its creation transaction has been committed. + let note = mock_single_target_note(account_id, 10); let effects = CommittedBlockEffects { header: mock_block_header(1_u32.into()), network_notes: vec![note], @@ -249,12 +341,39 @@ mod tests { network_account_updates: vec![], account_transactions: vec![], }; + coordinator.handle_committed_block(&effects).await.unwrap(); + + assert!( + !coordinator.actor_registry.contains_key(&account_id), + "an account without committed state must not get an actor", + ); + assert!( + coordinator.pending_spawns.contains(&account_id), + "the spawn must be deferred until the account's creation commits", + ); - coordinator.handle_committed_block(&effects); + // The creation commits in a later block; the builder persists the block's effects to the DB + // before handing them to the coordinator. + let db = coordinator.actor_context.state.db.clone(); + db.upsert_account_for_test(account_id, account, mock_transaction_id(0)) + .await + .unwrap(); + let effects = CommittedBlockEffects { + header: mock_block_header(2_u32.into()), + network_notes: vec![], + nullifiers: vec![], + network_account_updates: vec![(account_id, details)], + account_transactions: vec![], + }; + coordinator.handle_committed_block(&effects).await.unwrap(); assert!( - coordinator.actor_registry.contains_key(&unknown_id), - "previously-untouched account targeted by a note should get a fresh actor", + coordinator.actor_registry.contains_key(&account_id), + "the block committing the account's creation must release the deferred spawn", + ); + assert!( + coordinator.pending_spawns.is_empty(), + "a released spawn must leave the pending set", ); } @@ -274,7 +393,7 @@ mod tests { account_transactions: vec![], }; - coordinator.handle_committed_block(&effects); + coordinator.handle_committed_block(&effects).await.unwrap(); assert!( !coordinator.actor_registry.contains_key(&updated_id), @@ -323,6 +442,7 @@ mod tests { let bystander_notify = coordinator.actor_registry.get(&bystander).unwrap().notify.clone(); let target = mock_network_account_id_seeded(42); + seed_committed_account(&coordinator, target).await; let note = mock_single_target_note(target, 10); let effects = CommittedBlockEffects { header: mock_block_header(1_u32.into()), @@ -332,7 +452,7 @@ mod tests { account_transactions: vec![], }; - coordinator.handle_committed_block(&effects); + coordinator.handle_committed_block(&effects).await.unwrap(); assert!( bystander_notify.notified().now_or_never().is_some(), diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index 2d6aeafa3..d21f732fd 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -141,13 +141,12 @@ impl Db { // ============================================================================================ /// Applies the effects of a committed block (account upserts, note inserts, nullifier-driven - /// deletes, and chain-state advancement) in a single transaction. Returns the set of network - /// accounts touched by this block. + /// deletes, and chain-state advancement) in a single transaction. pub async fn apply_committed_block( &self, effects: CommittedBlockEffects, chain_mmr: PartialMmr, - ) -> Result> { + ) -> Result<()> { self.inner .transact("apply_committed_block", move |conn| { queries::apply_committed_block(conn, &effects, &chain_mmr) @@ -179,28 +178,39 @@ impl Db { .await } - /// Returns `true` if a committed account state exists for the given account. - pub async fn has_committed_account(&self, account_id: AccountId) -> Result { + /// Returns `true` if a committed state for the given account is tracked locally. + /// + /// The coordinator uses this to defer actor spawning until the account's creation transaction + /// has been committed. + pub async fn account_exists(&self, account_id: AccountId) -> Result { self.inner - .query("has_committed_account", move |conn| { - Ok(queries::get_account(conn, account_id)?.is_some()) - }) + .query("account_exists", move |conn| queries::account_exists(conn, account_id)) + .await + } + + /// Returns the committed account state for the given network account, if one is tracked locally. + /// + /// Actors load their account once at startup with this and keep it in memory afterwards, + /// advancing it from the committed state the coordinator pushes after each block. + pub async fn get_account( + &self, + account_id: AccountId, + ) -> Result> { + self.inner + .query("get_account", move |conn| queries::get_account(conn, account_id)) .await } - /// Returns the latest account state and available notes for the given account. - pub async fn select_candidate( + /// Returns the notes currently available for consumption by the given account. + pub async fn available_notes( &self, account_id: AccountId, block_num: BlockNumber, max_note_attempts: usize, - ) -> Result<(Option, Vec)> { + ) -> Result> { self.inner - .query("select_candidate", move |conn| { - let account = queries::get_account(conn, account_id)?; - let notes = - queries::available_notes(conn, account_id, block_num, max_note_attempts)?; - Ok((account, notes)) + .query("available_notes", move |conn| { + queries::available_notes(conn, account_id, block_num, max_note_attempts) }) .await } @@ -304,6 +314,22 @@ impl Db { let db = Db::load(db_path).await.expect("test DB load should succeed"); (db, dir) } + + /// Seeds a committed account row (and its `last_tx_id`) for tests that exercise the actor's + /// landing detection without driving a full committed block. + #[cfg(test)] + pub async fn upsert_account_for_test( + &self, + account_id: AccountId, + account: miden_protocol::account::Account, + last_tx_id: TransactionId, + ) -> Result<()> { + self.inner + .transact("test_upsert_account", move |conn| { + queries::upsert_account(conn, account_id, &account, last_tx_id) + }) + .await + } } /// The subset of write operations the builder's event loop performs, bound to a connection pinned @@ -320,7 +346,7 @@ impl LoopDb { &self, effects: CommittedBlockEffects, chain_mmr: PartialMmr, - ) -> Result> { + ) -> Result<()> { self.conn .transact("apply_committed_block", move |conn| { queries::apply_committed_block(conn, &effects, &chain_mmr) @@ -380,7 +406,7 @@ mod tests { let db = Db::load(db_path).await.expect("load should open the bootstrapped database"); assert!( - db.has_committed_account(account_id).await.expect("query should succeed"), + db.get_account(account_id).await.expect("query should succeed").is_some(), "genesis network account should be committed after bootstrap", ); } diff --git a/bin/ntx-builder/src/db/models/queries/accounts.rs b/bin/ntx-builder/src/db/models/queries/accounts.rs index 8cf9b6d7c..6df10cfc4 100644 --- a/bin/ntx-builder/src/db/models/queries/accounts.rs +++ b/bin/ntx-builder/src/db/models/queries/accounts.rs @@ -90,6 +90,26 @@ pub fn account_last_tx( .transpose() } +/// Returns `true` if a committed state for the given account is tracked locally. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT EXISTS (SELECT 1 FROM accounts WHERE account_id = ?1) +/// ``` +pub fn account_exists( + conn: &mut SqliteConnection, + account_id: AccountId, +) -> Result { + let account_id_bytes = conversions::account_id_to_bytes(account_id); + + let exists = + diesel::select(diesel::dsl::exists(schema::accounts::table.find(&account_id_bytes))) + .get_result(conn)?; + + Ok(exists) +} + /// Returns the committed account state for the given network account. /// /// # Raw SQL diff --git a/bin/ntx-builder/src/db/models/queries/mod.rs b/bin/ntx-builder/src/db/models/queries/mod.rs index e7dba3cf2..e66d5e329 100644 --- a/bin/ntx-builder/src/db/models/queries/mod.rs +++ b/bin/ntx-builder/src/db/models/queries/mod.rs @@ -1,6 +1,6 @@ //! Database query functions for the NTX builder. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use diesel::prelude::*; use miden_node_db::DatabaseError; @@ -42,15 +42,13 @@ mod tests; /// - Updates the singleton `chain_state` row's tip with the new block header and the /// post-application chain MMR. /// -/// Returns the set of network accounts that were touched by this block (account-state updates or -/// notes targeting the account). +/// The account upserts apply each block's network-account effects to the local store so the actor's +/// `account_last_tx` landing check and post-expiry reload see the authoritative committed state. pub fn apply_committed_block( conn: &mut SqliteConnection, effects: &CommittedBlockEffects, chain_mmr: &PartialMmr, -) -> Result, DatabaseError> { - let mut affected_accounts: HashSet = HashSet::new(); - +) -> Result<(), DatabaseError> { // The latest transaction in this block per account. For block-producer output every committed // account update originates from a transaction in the same block, so each upserted account has // an entry here. Collecting into a map keeps the last transaction per account (block order is @@ -87,17 +85,13 @@ pub fn apply_committed_block( upsert_account(conn, *account_id, ¤t, last_tx_id)?; }, } - affected_accounts.insert(*account_id); } - for note in &effects.network_notes { - affected_accounts.insert(note.target_account_id()); - } insert_network_notes(conn, &effects.network_notes)?; mark_notes_consumed(conn, &effects.nullifiers, effects.header.block_num())?; update_chain_state_tip(conn, effects.header.block_num(), &effects.header, chain_mmr)?; - Ok(affected_accounts.into_iter().collect()) + Ok(()) }