From 6d7a4b623911bdfcb32b059d8da229d0a2cc86aa Mon Sep 17 00:00:00 2001 From: han0110 Date: Mon, 8 Jun 2026 12:33:36 +0000 Subject: [PATCH] refactor: expose setup of zisk cluster client --- crates/cluster-client/zisk/src/client.rs | 38 +++++++++++++----------- crates/cluster-client/zisk/src/error.rs | 10 +++++-- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/crates/cluster-client/zisk/src/client.rs b/crates/cluster-client/zisk/src/client.rs index 496adb2f..0577f7a5 100644 --- a/crates/cluster-client/zisk/src/client.rs +++ b/crates/cluster-client/zisk/src/client.rs @@ -60,13 +60,26 @@ impl ZiskClusterClient { *self.verifier.program_vk() } + /// Setup the ELF. + pub async fn setup(&self) -> Result<(), Error> { + setup(&mut self.client.clone(), self.elf.clone()).await?; + Ok(()) + } + /// Submits a prove job and returns its `job_id` immediately, without waiting for completion. pub async fn create_prove_job(&self, input: &Input) -> Result { let mut client = self.client.clone(); let req = JobRequestMessage { job_kind: Some(prove_job(&self.hash_id, input)), }; - let job_id = client.job_request(req).await?.into_inner().job_id; + let job_id = match client.job_request(req).await { + Ok(res) => res.into_inner().job_id, + Err(status) if status.message().contains("setup not done") => Err(Error::SetupNotDone)?, + Err(status) if matches!(status.code(), Code::Unavailable | Code::Internal) => { + Err(Error::ClusterUnavailable(status))? + } + Err(status) => Err(Error::Grpc(status))?, + }; Ok(job_id) } @@ -105,38 +118,29 @@ impl ZiskClusterClient { /// /// Retries prove job submission on every 5 seconds until deadline. /// - /// Returns `Error::ClusterUnavailable` if the deadline expires before the job submission. + /// Returns `Error::CreateProveJobTimeout` if the deadline expires before the job submission. pub async fn prove( &self, input: &Input, deadline: Instant, ) -> Result<(ZiskProof, Duration), Error> { let fut = async { - let mut client = self.client.clone(); - let job = prove_job(&self.hash_id, input); loop { - let req = JobRequestMessage { - job_kind: Some(job.clone()), - }; - match client.job_request(req).await { - Ok(res) => return Ok(res.into_inner().job_id), - Err(status) if status.message().contains("setup not done") => { - setup(&mut client, self.elf.clone()).await?; - } - Err(status) if matches!(status.code(), Code::Unavailable | Code::Internal) => { + match self.create_prove_job(input).await { + Ok(job_id) => return Ok(job_id), + Err(Error::SetupNotDone) => self.setup().await?, + Err(Error::ClusterUnavailable(status)) => { warn!(?status, "job submission failed, retrying..."); sleep(Duration::from_secs(5)).await; } - Err(status) => { - return Err(Error::Grpc(status)); - } + Err(err) => return Err(err), }; } }; let job_id = match timeout_at(deadline, fut).await { Ok(result) => result?, - Err(_) => Err(Error::ClusterUnavailable)?, + Err(_) => Err(Error::CreateProveJobTimeout)?, }; match timeout_at(deadline, self.wait_prove_job(&job_id)).await { diff --git a/crates/cluster-client/zisk/src/error.rs b/crates/cluster-client/zisk/src/error.rs index 440b2e1e..81f7166a 100644 --- a/crates/cluster-client/zisk/src/error.rs +++ b/crates/cluster-client/zisk/src/error.rs @@ -11,18 +11,24 @@ pub enum Error { #[error("Failed to connect to cluster: {0}")] ConnectionFailed(#[from] tonic::transport::Error), + #[error("Cluster setup is not done yet")] + SetupNotDone, + #[error("Cluster job {job_id} failed: {reason}")] JobFailed { job_id: String, reason: String }, #[error("Cluster job {0} was cancelled")] JobCancelled(String), - #[error("Cluster unavailable timed out")] - ClusterUnavailable, + #[error("Cluster unavailable: {0}")] + ClusterUnavailable(tonic::Status), #[error("Setup job {job_id} timed out")] SetupTimeout { job_id: String }, + #[error("Create prove job timeout")] + CreateProveJobTimeout, + #[error("Prove job {job_id} timed out")] ProveTimeout { job_id: String },