diff --git a/Cargo.lock b/Cargo.lock index 531e07b0..546669dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3921,6 +3921,7 @@ dependencies = [ "tonic 0.14.5", "tonic-prost", "tonic-prost-build", + "tracing", ] [[package]] diff --git a/crates/cluster-client/zisk/Cargo.toml b/crates/cluster-client/zisk/Cargo.toml index e002fe40..25e88645 100644 --- a/crates/cluster-client/zisk/Cargo.toml +++ b/crates/cluster-client/zisk/Cargo.toml @@ -15,6 +15,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["time"] } tonic.workspace = true tonic-prost.workspace = true +tracing.workspace = true # Local dependencies ere-compiler-core.workspace = true diff --git a/crates/cluster-client/zisk/src/client.rs b/crates/cluster-client/zisk/src/client.rs index 6a1d2b8d..cf5cc2d6 100644 --- a/crates/cluster-client/zisk/src/client.rs +++ b/crates/cluster-client/zisk/src/client.rs @@ -10,6 +10,7 @@ use ere_verifier_zisk::{ use serde::Deserialize; use tokio::time::{Instant, sleep, timeout, timeout_at}; use tonic::{Code, transport::Channel}; +use tracing::warn; use crate::{ api::{ @@ -62,32 +63,10 @@ impl ZiskClusterClient { /// Submits a prove job and returns its `job_id` immediately, without waiting for completion. pub async fn create_prove_job(&self, input: &Input) -> Result { let mut client = self.client.clone(); - let job = JobKind { - kind: Some(job_kind::Kind::Prove(ProveRequest { - hash_id: self.hash_id.clone(), - input: Some(InputKind { - kind: Some(input_kind::Kind::Inline(InputChunk { - data: framed_stdin(input.stdin()), - })), - }), - proof_dest: ProofKind::StarkMinimal as i32, - proof_timeout: None, - hints: None, - })), - }; let req = JobRequestMessage { - job_kind: Some(job), - }; - let job_id = match client.job_request(req.clone()).await { - Ok(res) => res.into_inner().job_id, - Err(err) - if err.code() == Code::Unavailable && err.message().contains("setup not done") => - { - setup(&mut client, self.elf.clone()).await?; - client.job_request(req).await?.into_inner().job_id - } - Err(err) => Err(err)?, + job_kind: Some(prove_job(&self.hash_id, input)), }; + let job_id = client.job_request(req).await?.into_inner().job_id; Ok(job_id) } @@ -120,15 +99,11 @@ impl ZiskClusterClient { Ok(cancelled) } - /// Submits a prove job, wait up to `timeout` for completion, cancel the job on deadline. + /// Submits a prove job, wait for completion, cancel the job on deadline. /// /// Returns `Error::ProveTimeout` if the deadline expires before the job terminates. /// - /// Retries prove job submission until deadline if cluster response the unavailable errors: - /// - /// - `setup not done` - /// - `no workers connected` - /// - `workers are setting up` + /// Retries prove job submission on every 5 seconds until deadline. /// /// Returns `Error::ClusterUnavailable` if the deadline expires before the job submission. pub async fn prove( @@ -137,23 +112,26 @@ impl ZiskClusterClient { deadline: Instant, ) -> Result<(ZiskProof, Duration), Error> { let fut = async { + let mut client = self.client.clone(); + let job = prove_job(&self.hash_id, input); loop { - let result = self.create_prove_job(input).await; - if let Err(Error::Grpc(status)) = &result - && status.code() == Code::Unavailable - { - if status.message().contains("setup not done") { - setup(&mut self.client.clone(), self.elf.clone()).await?; - continue; - } - if status.message().contains("no workers connected") - || status.message().contains("workers are setting up") - { - sleep(Duration::from_secs(5)).await; - continue; + let req = JobRequestMessage { + job_kind: Some(job.clone()), + }; + let status = match client.job_request(req).await { + Ok(res) => return Ok::<_, Error>(res.into_inner().job_id), + Err(status) => { + if status.code() == Code::Unavailable + && status.message().contains("setup not done") + { + setup(&mut client, self.elf.clone()).await?; + continue; + } + status } - } - return result; + }; + warn!(?status, "job submission failed, retrying..."); + sleep(Duration::from_secs(5)).await; } }; @@ -246,6 +224,22 @@ async fn wait_job( } } +fn prove_job(hash_id: &str, input: &Input) -> JobKind { + JobKind { + kind: Some(job_kind::Kind::Prove(ProveRequest { + hash_id: hash_id.to_string(), + input: Some(InputKind { + kind: Some(input_kind::Kind::Inline(InputChunk { + data: framed_stdin(input.stdin()), + })), + }), + proof_dest: ProofKind::StarkMinimal as i32, + proof_timeout: None, + hints: None, + })), + } +} + /// Returns `data` with a LE u64 length prefix and padding to multiple of 8. /// /// The length prefix and padding is expected by ZisK emulator/prover runtime.