Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions crates/cluster-client/zisk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,26 @@ impl ZiskClusterClient {
*self.verifier.program_vk()
}

/// Setup the ELF.
pub async fn setup(&self) -> Result<(), Error> {
setup(&mut self.client.clone(), self.elf.clone()).await?;
Ok(())
}

/// Submits a prove job and returns its `job_id` immediately, without waiting for completion.
pub async fn create_prove_job(&self, input: &Input) -> Result<String, Error> {
let mut client = self.client.clone();
let req = JobRequestMessage {
job_kind: Some(prove_job(&self.hash_id, input)),
};
let job_id = client.job_request(req).await?.into_inner().job_id;
let job_id = match client.job_request(req).await {
Ok(res) => res.into_inner().job_id,
Err(status) if status.message().contains("setup not done") => Err(Error::SetupNotDone)?,
Err(status) if matches!(status.code(), Code::Unavailable | Code::Internal) => {
Err(Error::ClusterUnavailable(status))?
}
Err(status) => Err(Error::Grpc(status))?,
};
Ok(job_id)
}

Expand Down Expand Up @@ -105,38 +118,29 @@ impl ZiskClusterClient {
///
/// Retries prove job submission on every 5 seconds until deadline.
///
/// Returns `Error::ClusterUnavailable` if the deadline expires before the job submission.
/// Returns `Error::CreateProveJobTimeout` if the deadline expires before the job submission.
pub async fn prove(
&self,
input: &Input,
deadline: Instant,
) -> Result<(ZiskProof, Duration), Error> {
let fut = async {
let mut client = self.client.clone();
let job = prove_job(&self.hash_id, input);
loop {
let req = JobRequestMessage {
job_kind: Some(job.clone()),
};
match client.job_request(req).await {
Ok(res) => return Ok(res.into_inner().job_id),
Err(status) if status.message().contains("setup not done") => {
setup(&mut client, self.elf.clone()).await?;
}
Err(status) if matches!(status.code(), Code::Unavailable | Code::Internal) => {
match self.create_prove_job(input).await {
Ok(job_id) => return Ok(job_id),
Err(Error::SetupNotDone) => self.setup().await?,
Err(Error::ClusterUnavailable(status)) => {
warn!(?status, "job submission failed, retrying...");
sleep(Duration::from_secs(5)).await;
}
Err(status) => {
return Err(Error::Grpc(status));
}
Err(err) => return Err(err),
};
}
};

let job_id = match timeout_at(deadline, fut).await {
Ok(result) => result?,
Err(_) => Err(Error::ClusterUnavailable)?,
Err(_) => Err(Error::CreateProveJobTimeout)?,
};

match timeout_at(deadline, self.wait_prove_job(&job_id)).await {
Expand Down
10 changes: 8 additions & 2 deletions crates/cluster-client/zisk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@ pub enum Error {
#[error("Failed to connect to cluster: {0}")]
ConnectionFailed(#[from] tonic::transport::Error),

#[error("Cluster setup is not done yet")]
SetupNotDone,

#[error("Cluster job {job_id} failed: {reason}")]
JobFailed { job_id: String, reason: String },

#[error("Cluster job {0} was cancelled")]
JobCancelled(String),

#[error("Cluster unavailable timed out")]
ClusterUnavailable,
#[error("Cluster unavailable: {0}")]
ClusterUnavailable(tonic::Status),

#[error("Setup job {job_id} timed out")]
SetupTimeout { job_id: String },

#[error("Create prove job timeout")]
CreateProveJobTimeout,

#[error("Prove job {job_id} timed out")]
ProveTimeout { job_id: String },

Expand Down
Loading